Member since
07-30-2019
3406
Posts
1622
Kudos Received
1008
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 130 | 12-17-2025 05:55 AM | |
| 191 | 12-15-2025 01:29 PM | |
| 133 | 12-15-2025 06:50 AM | |
| 260 | 12-05-2025 08:25 AM | |
| 418 | 12-03-2025 10:21 AM |
04-01-2019
03:56 PM
@Isha Tiwari In order to merge FlowFiles that exist on multiple nodes in your cluster you are going to need to move all FlowFiles to one node. Apache NiFi 1.9.x versions introduced a new "Load Balanced" configuration option on dataflow connections. One of the options for the configurable "Load Balance Strategy" is "Single node". Setting this strategy will route all queued FlowFiles to one node in the cluster. You could set this on the connection feeding your Merge processor. - In Apache NiFi 1.8 and older you would need to use the PostHTTP processor (configured to send as FlowFile) to send all FlowFiles to a ListenHTTP processor running at one of your nodes URL (processor ill run on al nodes, but your postHTTP will only be configured with URL for one node). Problem with this solution is that if the target URL server goes down, your dataflow will stop working. - Thank you, Matt
... View more
03-29-2019
06:13 PM
@Isha Tiwari Did you change your Max bin age setting to a value higher than 1 minutes? Try setting it to 10 minutes. Is your NiFi a standalone instance or a NiFi cluster? Keep in mind that each Node in a NIFi cluster runs is own copy of the flow.xml.gz and works on it own set of FlowFiles. So the merge processor can only bin and merge the FlowFiles local to each node. Thanks, Matt
... View more
03-28-2019
05:05 PM
3 Kudos
@Isha Tiwari - The "Max" configuration properties do not govern how long a bin waits to be merged. The Merge based processors work as follows: - 1. The processor executes based upon the configured run Schedule. 2. At exact time of execution the Merge processor looks at what FlowFiles exist on Inbound connection that have not already been allocated to a Merge processor bin. 3. Those FlowFiles are then allocated to one or more bins. The max Bin size and Max number records create a ceiling for how many FlowFiles can be allocate to a bin. If a bin has reached one of these max values, additional FlowFile in this current execution start getting allocated to a new bin. 4. Once all FlowFiles from this current execution (Thread does not keep checking for new FlowFiles coming in to inbound connection. Those new FlowFiles would be handled by next execution) have been allocated to one or more bins, those bins are evaluated to see if they are eligible to be merged. In order to be eligible the bin must meet both minimum settings for size and number of records or the max bin age has been reached. In your case, a bin could be binned with only 20 records and 20 KB of size or if a bin has existed for at least 1 minute. - If you find your merging small bins consistently, changing the run schedule on your merge processor should help. This would allow more time between executions for FlowFiles to queue on the inbound connection. - IMPORTANT: Keep in mind that all FlowFiles allocated to bins are bing held in heap memory (swapping does not occur with bins). Specifically the FlowFile attributes/Metadata is the portion of the FlowFile held in heap memory. Your max records of 100,000 could result in considerable heap pressure. Using two Merge processors in series could achieve same result with lower heap usage. - I use MergeContent in following Article about connection queues as an example: https://community.hortonworks.com/articles/184990/dissecting-the-nifi-connection-heap-usage-and-perf.html - Thank you, Matt -If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
03-27-2019
05:08 PM
@Lanic - With release of Apache NiFi 1.7 and HDF 3.2 in mid 2018, the ability in terminate threads still executing on a processor that is in a state of "stopping" is now possible. After changing state of processor from start to stop, you will see processor display red square. You should give the running threads an opportunity to complete their execution. If it appears the processor is just not going to stop (hung threads) you can right click on the processor and select "Terminate" from the context menu displayed as follows:
... View more
03-25-2019
12:48 PM
@Aaron R The only method you can use to systematically adjust the configured run Schedule of a NiFi processor is through the use of the NiFi rest API. You would achieve this through a series of invokeHTTP processors. - 1. Use invokeHTTP processor to stop another processor. 2. Use invokeHTTP processor to change "Run schedule" configuration on a processor. 3. Use InvokeHTTP processor to start processor again. (Note that the processor will immediately execute when you start it and then not execute again for newly configured "run schedule" duration.) - You would then need to repeat above process for all three of your sensor inputs. Since I do not know the full details of your use case, it is not clear if this solution will really meet you needs. However, I can tell that based on how NiFi was developed, this is the only method for adjusting this configuration without user manual intervention. - You may just be better off building a flow that sends a FlowFile to each of the sensors invokeHTTP processor to trigger their execution. The InvokeHTTP processor does accept an inbound connection. When setup this way the invokeHTTP processor configured "run schedule" controls how often the processor checks that inbound connection for a FlowFile. When a FlowFile exists it will execute. - When a FlowFile is generated (by your "Sensor monitor controller" for example), that FlowFile will be timestamped. First you would need to split that incoming JSON into three different FlowFiles (one for each sensor). You could build a flow that parses the various sensor setting in to NiFi FlowFile Attributes and then create a if/then loop to check when the FlowFile timestamp + sensor value is equal to or less than current time. When that loop exits you route the FlowFile to appropriate invokeHTTP processor to trigger it to collect the sensor data. - Now becomes the question of what do you want to do with "Original" FlowFile out of the invokeHTTP. You can discard it and expect a new sensor monitor input each time or you can reset its trigger time and loop it back to the start. You would also need to in this case build in a method in your flow to kick this looping FlowFile out once a new sensor input comes in. (perhaps using the DistributedMapCache to store a value you increment each time a new timer comes in and then have your looping FlowFiles fetch this value to see if it has changed and kick FlowFile out if it has). - Hope these ideas help you with your design decisions. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
03-22-2019
08:58 PM
@Mario Tigua The File Filter property in the listFile processor does not support NiFi Expression Language. If yo float your cursor of the question mark icon to the right of a property name it will display a pop-up window that will tell you if this property will support NiFi expression language. - This property expects a java regular expression instead. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
03-22-2019
08:39 PM
@Derek Calderon Unfortunately, a rolling restart will not work here. When you shutdown only one node another node in the cluster would get elected as cluster coordinator and still retain list of nodes known to be part of cluster. By shutting down all nodes you essential wipe the slate clean of what nodes are in cluster. On start nodes will come up, components (processors, controller services, reporting tasks. etc..) will return to last known state (running, stopped, etc..), FlowFiles will be loaded back in to last reported connection queue, and processing will continue. - One of the nodes will get elected as the cluster coordinator by Zookeeper and then other nodes will learn from ZK who was elected and start sending heartbeats directly to that elected cluster coordinator to join the cluster. As node join they will be added to list of connected nodes. - The dead node that never checks in will no longer be in the list. - No need to stop components before stopping NiFi. They will return to last known state on start. - Thank you, Matt
... View more
03-22-2019
07:14 PM
@Derek Calderon Since your NiFi cluster is waiting on a response from the offloading node, it will not give you an option to delete node from list thus reducing your cluster node count by one. - If you stop all your remaining nodes and then start them, he cluster will come back up with only those nodes which join during startup. The other node will no longer be present. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
03-22-2019
05:48 PM
1 Kudo
@Benjamin Bouret The invokeHTTP processor would require you to use a SSL context service when communicating with a secure (https) endpoint. The SSLContext service can be setup with only a truststore.jks if this is only a 1-way TLS connection that does not require client authentication. - You should be able to use openssl to get the complete public certificate chain from the target secured endpoint. From command line execute following command: Openssl s_client -connect <hostname>:<port> -showcerts - The return from this command will include one or more public certificates. each public certificate will start with and end with following: -----BEGIN CERTIFICATE-----
.....
-----END CERTIFICATE----- - Copy each certificate including the above two lines and write each to separate file with a .crt extension. for example: CA-1.crt - Then import each of these public certificates in to the truststore you want to use in your SSLContext service as follows: # keytool -import -alias <unique Alias name 1> -file CA-1.crt -keystore truststore.jks
# keytool -import -alias <unique Alias name 2> -file CA-2.crt -keystore truststore.jks
etc... - Make your your NiFi service user can read this file where ever you decide to place on each of your NiFi nodes. - Thank you, Matt - If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
... View more
03-22-2019
12:36 PM
@sri chaturvedi - The question here what is the use case for needing to have a sequence number across entire cluster. Why not generate a sequential number per node to keep track of per node batches. Maybe use the NiFi hostname in the sequence number identifier? - Maybe you can share some more details on the full use case for these sequence numbers. Why you are generating and what they are being used for. - If you use the distributedMapCache, you could keep three different sequential number cached values (each node has its own sequence number stored in a cache entry by hostname. - You could then build a flow that fetches all three value add adds them together for you on an hourly/daily/weekly schedule? - Thank you, Matt
... View more