Created on 05-11-2018 02:30 AM - edited 08-17-2019 07:32 AM
In this article, I'm going to cover a simple solution to control the data processing in NiFi serially or based on an event trigger.
This article use Wait and Notify processors and DistributedMapCache controller service to achieve this functionality. Follows the links to the Usage Guide of these processors and Controller Services.
DistributedMapCache client service
NiFi Flow
The flow that I am going to present in this article has a very simple use case. Get the data from the source, at whatever rate, but process it one flow file at a time. Once a flow file is processed, it should trigger the processing of the next flow file. Follows a step by step explanation of what I have implemented to solve this problem.
First, the pre-requisites to start the flow.
We will be storing the DistributedMapCache to store and retrieve the "signals" to process the flow files and hence we would need the Server and Client Service for that. Follows a quick description.
We will be storing some information about the flow file which has been processed, which will help us to trigger the next flow file processing. To do so, we will be using the DistributedMapCacheServer controller service. It provides a map (key/value) cache that can be accessed over a socket. Interaction with this service is typically accomplished via a DistributedMapCacheClient service, which is discussed below. Follows a snapshot of this controller service. Nothing fancy about it and I have used the default settings for it.
Now to access the DistributedMapCacheServer, hosted at port 4557 above, we need a client service. This will aid us in storing and retrieving the cache. I am keeping it simple and leaving the default settings again for simplicity.
Now the NiFI flow details
Follows the snapshot of the flow for a quick overview
For this use case, I am generating the data using a GenerateFlowFile processor.
This is an important part of the processing. Here I am using an UpdateAttribute processor. This processor assigns a token to each flow file, which is incremented by 1 every time a flow file passes through it. The important part is that we are storing the state of this token variable and hence are able to assign a unique and auto incremented value to each of our flow files. This token will help us process the data in a serial fashion. Follows a snapshot of this processor.
Once the flow files are tagged, they are redirected to the Wait processor. This processor makes the flow files to wait and don't release them until a matching release signal is stored in Distributed Cache. Have a look at the configuration of the Wait processor. We are looking at the DistributedMapCache Server for a counter called tokenCounter and when the value of tokenCounter will equal the value of Release Signal Identifier, which is the token number of the flow file, in this case, it will release that flow file.
If you look at the NiFi flow, before Wait processor, we have the RouteonAttribute. This is just for to handle the very first flow file. It will redirect the flow file with token #1 to the Notify processor. The Notify processor picks the value from the token attribute and stores it in the DistributedMapCache for against the key tokenCounter. This will instruct the Wait processor to release the flow file with token #1 for further processing.
Next, the desired processing can be done on the flow file and once done, simple one up the token attributed and feed it to the Notify processor to release the next flow file. For example, flow file with token #1, once processed, will be updated to increment the token # to 2 and then sent to Notify processor. This will trigger the release of the file with token #2 by the Wait processor and cycle will go on.
So here we are! With our flow to control the processing of our data according to our need in NiFi.
Please feel free to leave comments to let know about the flow and for questions and queries.
Hope that helps!