Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar

This is part 2 of a miniseries. Part 1 can be found here.

In part 1 of this miniseries, I showed how a simple line count of a text file can be done using NiFi's record processors. The QueryRecord processor that I used creates a new FlowFile for each query result, which is nice if you are only interested in the result. But what if I want to add the line count as an attribute to the original FlowFile?

QueryRecord sends the query result(s) to a relationship that is named after the custom attribute defining the query. The original FlowFile is routed to the relationship named "original". I want to match these two together and I am going to use the Wait/Notify processor pair.

The Wait processor holds a FlowFile until it receives a signal, which is marked by the presence of a defined value in a distributed cache. The Notify processor can set such a value when it receives a FlowFile. From the FlowFile that is processed by the Notify processor, attributes can be transferred to the FlowFile that is held by the Wait processor.

Here is the complete flow as an overview:

The GetFile and QueryRecord processors are configured the same way as in part 1. The line count result is sent to an ExtractText processor with the sole purpose of transferring the result to an attribute. The ExtractText processor's configuration looks like this:

Any sequence of 1 or more digits is going to be transferred from the FlowFile content to a dynamic attribute called numLines.

Next, I need to set up a cache service to share information between the Notify and Wait processors. In my prototype example, this is going to be a DistributedMapCache service that lives on my laptop. I reach the setup dialog through the Settings item in the Operate panel:

Here, I add a DistributedMapCacheServer, leaving all settings at the default values.

Next, I configure three settings in the Notify processor:

  1. The Release Signal Identifier should be something that enables me to match the line count uniquely to an original FlowFile. For now, I am going to use the file name of the file I picked up from the file system at the beginning. (This attribute is set automatically by the GetFile processor, and is still present in both the original FlowFile and the query result.) Hence, I set the Release Signal Identifier to ${filename}.
  2. The Distributed Cache Service is a client controller service that communicates with the cache server that I created earlier. I create a new DistributedMapClientService, and configure the hostname of the cache server to be localhost.
  3. AttributeCacheRegex is a regular expression filter: all attributes whose names match the regular expression will be transferred to the FlowFile that is being held in the corresponding Wait processor.

I connect the Success relationship from the ExtractText processor to the Notify processor, and auto-terminate all relationships that go out from the Notify processor.

Next, I add a Wait processor and connect it to the "original" relationship from the QueryRecord processor. The Wait processor is configured to match the Notify processor:

  1. Release Signal Identifier is, again, ${filename}.
  2. For Distributed Cache Service, I pick the previously configured cache client service from the list.

I transfer the Success relationship to a funnel, so I can peek into the result FlowFiles. Indeed, I get FlowFiles now with the original file content, and the line count as an attribute!

Note: This is only an exercise. For a real system, a lot more factors need to be taken into consideration, such as uniqueness of the cache signals, possible race conditions, and scalability. But the general approach should be clear from this simple example.

4,512 Views
0 Kudos
Comments
avatar
Rising Star

Hi,

Very interesting article.

In what day to day scenario would this be useful ?

Thx

This approach is useful for any case where you want to extract any information by querying the FlowFile content, and adding this information as an attribute value. You could produce something like an "EXIF record" for your data items.

avatar
Rising Star
Version history
Last update:
‎11-03-2017 08:01 PM
Updated by:
Former Member
Contributors