Member since
‎08-03-2019
186
Posts
34
Kudos Received
26
Solutions
‎07-02-2018
06:28 PM
1 Kudo
In this article, I will discuss one of the most exciting new features coming with NiFi 1.7.0 is the possibility of terminating running threads from the NiFi UI. I can think of a couple of examples in the previous NiFi versions where we either had to wait for a running thread to end before being able to make any changes to the processor configs or, in worst case scenarios, restart the NiFi cluster because of some thread is in a deadlock condition. For example An ExecuteSQL processor is stuck since the source RDBMS is not able to handle the data pull and has yielded under pressure. Or the other processes are not able to use the RDBMS since resources are hogged by this complete database scan operation. Either we wait for a literally infinite period of time or if the problem is serious, stop the cluster all together. Some custom script/processor has a deadlock situation and the thread won't stop ever. The only option we have in this scenario was to restart the machine/cluster running that process. Thanks to NiFi 1.7.0, now we have a more elegant solution to these kinds of problems, Terminate the thread from the UI itself. Follows a quick example of how we can do it. So for my flow, I created a sample flow with a GenerateFlowFile processor which is running continuously on all the possible nodes, a single one, in this case, my Mac 🙂 I have made the thread to run for longer once initiated and hence, even if I stop the processor, the thread will still keep on running. Have a look into the snapshots below. When I stopped the processor, the number of thread increase from 1 to 2, since now the thread to stop the processor is waiting for actively running thread. But with this new version of NiFi, NiFi 1.7.0, we have this option of terminating the threads explicitly from the UI itself, see snapshot #2 for the Terminate option. When the Terminate option is chosen Interrupt for the thread will be issued. A new instance of the processor will be created. The old instance will be eventually shut down. So here we are! With the new power to interrupt the threads from the NiFi UI. But please be careful! With greater powers, come greater responsibilities! I will add more information on what can be the probable issues, if any, of stopping the threads in between. Please feel free to leave comments to let know about the flow and for questions and queries. Hope that helps!
... View more
Labels:
‎05-11-2018
02:30 AM
2 Kudos
In this article, I'm going to cover a simple solution to control the data processing in NiFi serially or based on an event trigger. This article use Wait and Notify processors and DistributedMapCache controller service to achieve this functionality. Follows the links to the Usage Guide of these processors and Controller Services. Wait Processor Notify Processor DistributedMapCache Server DistributedMapCache client service NiFi Flow The flow that I am going to present in this article has a very simple use case. Get the data from the source, at whatever rate, but process it one flow file at a time. Once a flow file is processed, it should trigger the processing of the next flow file. Follows a step by step explanation of what I have implemented to solve this problem. First, the pre-requisites to start the flow. We will be storing the DistributedMapCache to store and retrieve the "signals" to process the flow files and hence we would need the Server and Client Service for that. Follows a quick description. DistributedMapCacheServer We will be storing some information about the flow file which has been processed, which will help us to trigger the next flow file processing. To do so, we will be using the DistributedMapCacheServer controller service. It provides a map (key/value) cache that can be accessed over a socket. Interaction with this service is typically accomplished via a DistributedMapCacheClient service, which is discussed below. Follows a snapshot of this controller service. Nothing fancy about it and I have used the default settings for it. DistributedMapCacheClientService Now to access the DistributedMapCacheServer, hosted at port 4557 above, we need a client service. This will aid us in storing and retrieving the cache. I am keeping it simple and leaving the default settings again for simplicity. Now the NiFI flow details Follows the snapshot of the flow for a quick overview Data Generation For this use case, I am generating the data using a GenerateFlowFile processor. Flow file tagging This is an important part of the processing. Here I am using an UpdateAttribute processor. This processor assigns a token to each flow file, which is incremented by 1 every time a flow file passes through it. The important part is that we are storing the state of this token variable and hence are able to assign a unique and auto incremented value to each of our flow files. This token will help us process the data in a serial fashion. Follows a snapshot of this processor. Tagged? Now let's make them wait! Once the flow files are tagged, they are redirected to the Wait processor. This processor makes the flow files to wait and don't release them until a matching release signal is stored in Distributed Cache. Have a look at the configuration of the Wait processor. We are looking at the DistributedMapCache Server for a counter called tokenCounter and when the value of tokenCounter will equal the value of Release Signal Identifier, which is the token number of the flow file, in this case, it will release that flow file. So how does the DistributedMapCache get this tokenNumber? If you look at the NiFi flow, before Wait processor, we have the RouteonAttribute. This is just for to handle the very first flow file. It will redirect the flow file with token #1 to the Notify processor. The Notify processor picks the value from the token attribute and stores it in the DistributedMapCache for against the key tokenCounter. This will instruct the Wait processor to release the flow file with token #1 for further processing. What's next? Next, the desired processing can be done on the flow file and once done, simple one up the token attributed and feed it to the Notify processor to release the next flow file. For example, flow file with token #1, once processed, will be updated to increment the token # to 2 and then sent to Notify processor. This will trigger the release of the file with token #2 by the Wait processor and cycle will go on. So here we are! With our flow to control the processing of our data according to our need in NiFi. Please feel free to leave comments to let know about the flow and for questions and queries. Hope that helps!
... View more
Labels: