Member since
11-16-2015
890
Posts
648
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
940 | 02-22-2024 12:38 PM | |
812 | 02-02-2023 07:07 AM | |
2160 | 12-07-2021 09:19 AM | |
3444 | 03-20-2020 12:34 PM | |
11646 | 01-27-2020 07:57 AM |
01-11-2018
04:42 PM
Correct, each instance of a processor gets at least one thread (up to Max Concurrent Tasks) so they will execute concurrently. The sequential part is just that a flow file will not be sent to the second processor until it has finished processing in the first. But the second processor could be processing the first flow file while the first processor is processing the second flow file (if that makes sense). If you need pure sequential processing, you may need to put your two commands into a shell script and call from ExecuteStreamCommand, or if you need to process multiple flow files at once, you might try a Jython script in an ExecuteScript processor where you do a session.get(<batch size>), which returns a list of flowfiles of size somewhere between 0 and your batch size (based on how many were available at the time of the get). Lastly, you might be able to use the Wait/Notify processors to create barrier synchronization.
... View more
01-09-2018
07:33 PM
3 Kudos
You have a few different kinds of transformations going on there: 1) Value -> Category, such as is_rain, season, and is_rushHour 2) Hoisting values from nested fields (possibly renaming the field), such as wind_speed 3) Fetch on match, such as PM10 and sensor_id, using nested values when a particular value is P1) In NiFi, some processors are better at different transformations than others. For the first kind, I was able to achieve this with EvaluateJsonPath followed by two UpdateAttribute processors: - EvaluateJsonPath extracts the "timestamp" field into an attribute called "dt". I couldn't use the actual "dt" field because it appears to be number of seconds since midnight or something, the date evaluates to 1/18/1970: - UpdateAttribute 1 extracts the hour and month values from the timestamp. Note the use of GMT timestamp in the functions, you may need to set that to something different (or exclude it altogether): - UpdateAttribute 2 performs the categorization logic, by checking if the month is between 12 and 2 (1 = winter), 3-5 (2 = spring), 6-8 (3=summer), 9-11 (4=fall), and also if the hour of day is between 8-9 AM or 5-6 PM for is_rushHour: The Expression for rush.hour is as follows: ${hour:ge(8):and(${hour:le(9)}):ifElse(1,${hour:ge(17):and(${hour:lt(19)}):ifElse(1,0)})} The Expression for season is as follows: ${month:gt(11):or(${month:lt(3)}):ifElse(1,
${month:ge(3):and(${month:lt(6)}):ifElse(2,
${month:ge(6):and(${month:lt(9)}):ifElse(3,4)})})} - JoltTransformJSON performs the other two types of transformation (with a "shift" spec), along with injecting the categorical fields using Expression Language (in a "default" spec), combined as a "Chain" spec: [
{
"operation": "shift",
"spec": {
"timestamp": "timestamp",
"wind": {
"deg": "wind_deg",
"speed": "wind_speed"
},
"main": {
"humidity": "humidity",
"pressure": "pressure",
"temp": "temperature"
},
"location": {
"longitude": "long",
"latitude": "lat"
},
"sensordatavalues": {
"*": {
"value_type": {
"P1": {
"@(2,value)": "PM10",
"@(2,id)": "sensor_id"
}
}
}
},
"cod": {
"200": {
"#1": "isRain"
},
"*": {
"#0": "isRain"
}
}
}
},
{
"operation": "default",
"spec": {
"season": "${season}",
"is_rushHour": "${rush.hour}"
}
}
] Note that I am assuming a "cod" value of 200 is rain and everything else is not. If there are other codes, you can add other blocks to the "cod" section of the spec after the 200 block. The * block handles anything not matched (basically an "else" clause). The flow is put together in the aforementioned order:
... View more
01-08-2018
10:05 PM
3 Kudos
Here the MIME type for your flow file is set to text/plain, it is an attribute on the flow file called "mime.type". You can use an UpdateAttribute processor to set "mime.type" to "application/json".
... View more
01-05-2018
06:09 PM
1 Kudo
With the record-aware processors like ConvertRecord, you don't need SplitJSON either, it can work on a whole JSON array
... View more
01-03-2018
04:11 PM
2 Kudos
In order to keep the dependency as provided and use DBCPService in your processor(s), you can set the parent NAR for your nifi-custom-nar module to "nifi-standard-services-api-nar", by adding the following as a dependency to your nifi-custom-nar's pom.xml: <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency> Using JbdcCommon is a different issue, as it is not part of an API but rather the nifi-standard-processors JAR. This NiFi dev mailing list thread explains some workarounds, such as using nifi-standard-nar as your NAR parent, or copying JdbcCommon to your project (the latter was done for HiveJbdcCommon for that reason plus custom changes specific to Hive).
... View more
12-22-2017
03:30 PM
1 Kudo
Rather than GetFile, you could use ListenHttp -> FetchFile -> PutHDFS. Then you can POST to your configured endpoint with a header set to the filename, and in ListenHttp set that header name as the value of the "HTTP Headers to receive as Attributes (Regex)" property. Then FetchFile would be configured to use that attribute to fetch the file and pass to PutHDFS.
... View more
12-14-2017
11:04 PM
1 Kudo
This is a known "feature" of the DB2 driver, it closes the result set after the last row is retrieved, but our code doesn't know whether the last row has been retrieved or not, which is why we call ResultSet.next() to see if there is anything there. According to this, you should be able to add "allowNextOnExhaustedResultSet=1" either to your JDBC URL or as a user-defined property in your DBCPConnectionPool (via NIFI-3426, assuming you have NiFi 1.2.0 or later), then ResultSet.next() should not throw an exception and instead should finish successfully.
... View more
12-13-2017
03:05 PM
1 Kudo
As of NiFi 1.2.0 (via NIFI-3658), you can use ConvertRecord to achieve this. You would configure a CSVReader to match your CSV format (comma-delimited, possibly other config options around header line, quotes, etc.) and a JSONRecordSetWriter to output JSON. Prior to NiFi 1.2.0, you could use ConvertCSVToAvro -> ConvertAvroToJSON.
... View more
12-13-2017
01:00 PM
Can you share an example or two of incoming JSON data, your config for EvaluateJSONPath, and an example of the flow file after MergeContent (perhaps setting number of entries much lower to fit here)?
... View more
12-09-2017
06:21 PM
This is great thanks! Also, before too long there should be a NiFi processor and controller service to help with some of the session management (NIFI-4683).
... View more