Created on 11-03-2017 08:01 PM
This is part 2 of a miniseries. Part 1 can be found here.
In part 1 of this miniseries, I showed how a simple line count of a text file can be done using NiFi's record processors. The QueryRecord processor that I used creates a new FlowFile for each query result, which is nice if you are only interested in the result. But what if I want to add the line count as an attribute to the original FlowFile?
QueryRecord sends the query result(s) to a relationship that is named after the custom attribute defining the query. The original FlowFile is routed to the relationship named "original". I want to match these two together and I am going to use the Wait/Notify processor pair.
The Wait processor holds a FlowFile until it receives a signal, which is marked by the presence of a defined value in a distributed cache. The Notify processor can set such a value when it receives a FlowFile. From the FlowFile that is processed by the Notify processor, attributes can be transferred to the FlowFile that is held by the Wait processor.
Here is the complete flow as an overview:
The GetFile and QueryRecord processors are configured the same way as in part 1. The line count result is sent to an ExtractText processor with the sole purpose of transferring the result to an attribute. The ExtractText processor's configuration looks like this:
Any sequence of 1 or more digits is going to be transferred from the FlowFile content to a dynamic attribute called numLines.
Next, I need to set up a cache service to share information between the Notify and Wait processors. In my prototype example, this is going to be a DistributedMapCache service that lives on my laptop. I reach the setup dialog through the Settings item in the Operate panel:
Here, I add a DistributedMapCacheServer, leaving all settings at the default values.
Next, I configure three settings in the Notify processor:
I connect the Success relationship from the ExtractText processor to the Notify processor, and auto-terminate all relationships that go out from the Notify processor.
Next, I add a Wait processor and connect it to the "original" relationship from the QueryRecord processor. The Wait processor is configured to match the Notify processor:
I transfer the Success relationship to a funnel, so I can peek into the result FlowFiles. Indeed, I get FlowFiles now with the original file content, and the line count as an attribute!
Note: This is only an exercise. For a real system, a lot more factors need to be taken into consideration, such as uniqueness of the cache signals, possible race conditions, and scalability. But the general approach should be clear from this simple example.
Created on 11-05-2017 11:15 PM
Hi,
Very interesting article.
In what day to day scenario would this be useful ?
Thx
Created on 11-06-2017 01:31 PM
This approach is useful for any case where you want to extract any information by querying the FlowFile content, and adding this information as an attribute value. You could produce something like an "EXIF record" for your data items.
Created on 11-07-2017 10:16 PM
Thanks @Hellmar Becker