Member since
11-16-2015
905
Posts
666
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 479 | 09-30-2025 05:23 AM | |
| 805 | 06-26-2025 01:21 PM | |
| 732 | 06-19-2025 02:48 PM | |
| 906 | 05-30-2025 01:53 PM | |
| 11588 | 02-22-2024 12:38 PM |
01-30-2018
12:59 PM
2 Kudos
What version of NiFi are you using? The timezone parameter was added in NiFi 1.2.0 / HDF 3.0 (NIFI-2908).
... View more
01-24-2018
04:57 PM
So right now it appears you are trying to do validation and extraction at the same time, since you don't want "case 2" to move down the stream. If your new ReplaceText from this comment is more performant than the one from the original question, you can use RouteOnContent first to exclude the files that do not have the required header and footer. Since there will now be two pattern matching processors, you may find that it is less performant, but it's probably worth a try. Another option is ExecuteScript with a fast scripting language like Groovy or Javascript/Nashorn, but the overhead of the interpreted script might be worse than the improvement of looking only for headers/footers rather than a whole regex.
... View more
01-23-2018
07:00 PM
1 Kudo
How is your data coming into NiFi? If it is a single flow file with all the rows (such as ExecuteSQL which returns an Avro file with records in it), then you can use SplitAvro and then downstream each flow file can be processed separately, with no looping required. If your input is a text file you can use SplitText, if JSON then SplitJSON, etc. If instead you have a number (say 10), and you need to fetch rows with ids 1-10, you can either use ExecuteSQL and get all rows < 10. If I am misunderstanding your use case and you do need to loop, then after you get your loop variable into an attribute (perhaps with EvaluateJSONPath as you mention), then you can use RouteOnAttribute only to see if it is time to exit the loop (${loopVariable:gt(0)} for example). Otherwise you can use UpdateAttribute to increment or decrement the counter, and send that output back to the beginning of the loop.
... View more
01-23-2018
06:24 PM
You can use either ConvertRecord or ConvertAvroToJSON to convert your incoming Avro data to JSON. If the incoming Avro files do not have a schema embedded in them, then you will have to provide it, either to an AvroReader (for ConvertRecord) or the "Avro schema" property (for ConvertAvroToJSON).
... View more
01-22-2018
06:32 PM
1 Kudo
Perhaps try ReplaceText first, to match your beginning and end text, and replace them with an empty string. Then if you need the content as an attribute, you can use ExtractText with (.*). Do you definitely need the value in an attribute? If you can keep it in the content after the ReplaceText processor.
... View more
01-11-2018
04:42 PM
Correct, each instance of a processor gets at least one thread (up to Max Concurrent Tasks) so they will execute concurrently. The sequential part is just that a flow file will not be sent to the second processor until it has finished processing in the first. But the second processor could be processing the first flow file while the first processor is processing the second flow file (if that makes sense). If you need pure sequential processing, you may need to put your two commands into a shell script and call from ExecuteStreamCommand, or if you need to process multiple flow files at once, you might try a Jython script in an ExecuteScript processor where you do a session.get(<batch size>), which returns a list of flowfiles of size somewhere between 0 and your batch size (based on how many were available at the time of the get). Lastly, you might be able to use the Wait/Notify processors to create barrier synchronization.
... View more
01-09-2018
07:33 PM
3 Kudos
You have a few different kinds of transformations going on there: 1) Value -> Category, such as is_rain, season, and is_rushHour 2) Hoisting values from nested fields (possibly renaming the field), such as wind_speed 3) Fetch on match, such as PM10 and sensor_id, using nested values when a particular value is P1) In NiFi, some processors are better at different transformations than others. For the first kind, I was able to achieve this with EvaluateJsonPath followed by two UpdateAttribute processors: - EvaluateJsonPath extracts the "timestamp" field into an attribute called "dt". I couldn't use the actual "dt" field because it appears to be number of seconds since midnight or something, the date evaluates to 1/18/1970: - UpdateAttribute 1 extracts the hour and month values from the timestamp. Note the use of GMT timestamp in the functions, you may need to set that to something different (or exclude it altogether): - UpdateAttribute 2 performs the categorization logic, by checking if the month is between 12 and 2 (1 = winter), 3-5 (2 = spring), 6-8 (3=summer), 9-11 (4=fall), and also if the hour of day is between 8-9 AM or 5-6 PM for is_rushHour: The Expression for rush.hour is as follows: ${hour:ge(8):and(${hour:le(9)}):ifElse(1,${hour:ge(17):and(${hour:lt(19)}):ifElse(1,0)})} The Expression for season is as follows: ${month:gt(11):or(${month:lt(3)}):ifElse(1,
${month:ge(3):and(${month:lt(6)}):ifElse(2,
${month:ge(6):and(${month:lt(9)}):ifElse(3,4)})})} - JoltTransformJSON performs the other two types of transformation (with a "shift" spec), along with injecting the categorical fields using Expression Language (in a "default" spec), combined as a "Chain" spec: [
{
"operation": "shift",
"spec": {
"timestamp": "timestamp",
"wind": {
"deg": "wind_deg",
"speed": "wind_speed"
},
"main": {
"humidity": "humidity",
"pressure": "pressure",
"temp": "temperature"
},
"location": {
"longitude": "long",
"latitude": "lat"
},
"sensordatavalues": {
"*": {
"value_type": {
"P1": {
"@(2,value)": "PM10",
"@(2,id)": "sensor_id"
}
}
}
},
"cod": {
"200": {
"#1": "isRain"
},
"*": {
"#0": "isRain"
}
}
}
},
{
"operation": "default",
"spec": {
"season": "${season}",
"is_rushHour": "${rush.hour}"
}
}
] Note that I am assuming a "cod" value of 200 is rain and everything else is not. If there are other codes, you can add other blocks to the "cod" section of the spec after the 200 block. The * block handles anything not matched (basically an "else" clause). The flow is put together in the aforementioned order:
... View more
01-08-2018
10:05 PM
3 Kudos
Here the MIME type for your flow file is set to text/plain, it is an attribute on the flow file called "mime.type". You can use an UpdateAttribute processor to set "mime.type" to "application/json".
... View more
01-05-2018
06:09 PM
1 Kudo
With the record-aware processors like ConvertRecord, you don't need SplitJSON either, it can work on a whole JSON array
... View more
01-03-2018
04:11 PM
2 Kudos
In order to keep the dependency as provided and use DBCPService in your processor(s), you can set the parent NAR for your nifi-custom-nar module to "nifi-standard-services-api-nar", by adding the following as a dependency to your nifi-custom-nar's pom.xml: <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency> Using JbdcCommon is a different issue, as it is not part of an API but rather the nifi-standard-processors JAR. This NiFi dev mailing list thread explains some workarounds, such as using nifi-standard-nar as your NAR parent, or copying JdbcCommon to your project (the latter was done for HiveJbdcCommon for that reason plus custom changes specific to Hive).
... View more