Member since
Kudos Received
My Accepted Solutions
Title | Views | Posted |
106 | 01-09-2025 11:14 AM | |
672 | 01-03-2025 05:59 AM | |
396 | 12-13-2024 10:58 AM | |
429 | 12-05-2024 06:38 AM | |
359 | 11-22-2024 05:50 AM |
03:43 PM
1 Kudo
Just to clarify on how S2S works when communicating with a target NiFi cluster. The NCM never receives any data so it cannot act as the load-balancer. When the source NiFi communicates with the NCM, the NCM returns a list of all currently connected nodes and there S2S ports along with the current load on each node to the source NiFi. It is then the job of the source NiFi RPG to use that information to do a smart load-balanced delivery of data to those nodes.
... View more
03:04 PM
Anything you can do via the browser can be done my making calls to the NiFi-API. You could either setup an external process to run a couple curl commands to start and they stop the GetTwitter processor in your flow or you could us a couple invokeHTTP processors in your dataflow (configured using the cron scheduling strategy) to start and stop the GetTwitter processor on a given schedule. Matt
... View more
02:14 PM
1 Kudo
@INDRANIL ROY What you describe is a very common dataflow design. I have a couple question for clarity. RPG (Remote Process Group) do not send to other RPGs. RPG send and pull data from input and output ports located on other NiFi instances. I suspect your standalone instance has the RPG and it is sending FlowFiles to input port(s) on the destination NiFi cluster. In this particular case the load-balancing of data is being handled by the RPG. For network efficiency data is distributed in batches, so you may not see with light dataflows and exact same number of FlowFiles going to each Node. Also the Load-balancing has logic built in to it so that Node in the target cluster who have less work load get more FlowFiles. Although the URL provided to the RPG is the URL for the target BNiFi cluster's NCM, the FlowFiles are not sent to the NCM, but rather sent directly to the connected nodes in the target cluster. Every Node in a NiFi cluster operates independently of one another working only on the FlowFiles it possesses. Nodes do not communicate with one another. They simply report their health and status back to the NCM. It is information from those health and status heartbeats that is sent back to the source RPG and used by that RPG to do the smart data delivery. In order to distribute the fetching of the source data, the source directory would need to be reachable by all nodes in the target NiFi cluster. In the case of ListFile/FetchFile, the directory would need to mounted identically to all systems. Another option would be to switch to a listSFTP/FetchSFTP setup. In this setup you would not even need your standalone NiFi install. You could simply add a listSFTP processor to your cluster (configured to run "on primary node"). Then take the success from that listing and feed it to an RPG that points back at the clusters NCM URL. An input port would be used to receive the now load-balanced FlowFiles. Feed the success from that input port to the FetchSFTP processor and now you have all nodes in your cluster retrieving the actual content. So as you can see from the above the listSFTP would only run on one node (Primary Node) producing no content FlowFiles. The RPG would smartly distribute those FlowFile across all connected nodes where the FetchSFTP on each Node would retrieve the actual content. The same flow above could be done with listFile and FetchFile as well, just mount the same source directory to every node and follow the same model. Matt
... View more
12:53 PM
Most threads are very short running (miliseconds) and since the NiFi refresh rate defaults are every 30 seconds. The number in the upper right corner may not represent a still running thread. In your screenshot above, the TailFile processor shows as having recorded the completion of 473,336 Tasks (Each task using a thread to complete) and a total cumulative thread time of only 2 min, 52 seconds and 334 milliseconds over the past 5 minutes. Long running threads will show much different stats in the Tasks/Time field.
... View more
08:33 PM
1 Kudo
@Hans Feldmann The individual processors allow for concurrent task changes. By default they all have one concurrent task. For each additional concurrent task, you are giving that processor the opportunity to request an additional thread from the NiFi controller to do work in parallel. (Think of it as two copies of the same processor doing working different files or batches of files). If there isn't sufficient files in the incoming queue then any additional concurrent tasks are not utilized. The flip side is if you allocate two many concurrent tasks to a single processor, that processor may itself end up using two many threads from the NiFi controller's resource pull resulting in a thread starvation to other processors. So star with the default and setup by one increment at a time in place of backlog in your flow. The NiFi controller also has a setting that limits the maximum number of threads it can use from the underlying hardware. This is the other thing Andrew was mentioning. A restart of NiFi is NOT needed when you make changes to these values. The defaults are low (10 timer driven and 5 event driven). I would set the timer driven to no more then double the number of cores your hardware has. Thanks, Matt
... View more
05:15 PM
@Yogesh Sharma You are seeing duplicate data because the run schedule on your invokeHTTP processor is set to 1 sec and the data you are pulling is not updated that often.
You can build in to your flow the ability to detect duplicates (even across a NiFi cluster). In order to do this you will need the following things setup: 1. DistributedMapCacheServer (Add this controller service to "Cluster Manager" if clustered. If standalone it still needs to be added. This is configured with a listening port) 2. DistributedMap CacheClientService (Add this controller service to "Node" if clustered. If standalone it still needs to be added. This is configured with teh FQDN of the NCM running the above Cache Server.) 3. Start the above controller services. 4. Add a HashContent and DetectDuplicate processors to your flow between your invokeHTTP processor and the SplitJson processors.
I have attached a modified version of your template.
eqdataus-detectduplicates.xml If you still see duplicates, adjust the configured age off duration in the DetectDuplicate processor.
Thanks, Matt
... View more
12:40 PM
2 Kudos
@Obaid Salikeen Not sure what "issues" you had when you tried to add a new node to your existing cluster. The components (processors, connections, etc...) of an existing cluster can be running when you add new additional nodes to it. The new nodes will inherit the flow and templates from the NCM as well as the current running state of those components when it joins.
But, in order for a node to successfully join a cluster the following must be true:
1. The new node either has no flow.xml.gz file and templates directory or the flow.xml.gz file and templates do not match what is currently on the NCM. (Remove flow.xml.gz file and templates dir from new node and restart node) The nifi-app.log will indicate in if a difference was found. 2. The nifi.sensitive.props.key= in the file must have the same value as on the NCM. 3. The NCM must be able to resolve the URL to the new node. If the nifi.web.http(s).host= was left blank on your new node, Java on that node may be reporting the hostname as localhost. Make sure valid resolvable hostnames are supplied for, nifi.cluster.node.address=, and nifi.cluster.node.unicast.manager.address=. 4. Both NCM and Node security protocol must match. in file. 5. Firewalls must be open between NCM and Node on both HTTP(s) port and node and NCM ports. 6. New node must have all the same available java classes. If custom processors exist in your flow make sure the new node also has those custom nar/jar files included in its lib dir. Thanks, Matt
... View more
09:20 PM
1 Kudo
@Saikrishna Tarapareddy
Where all 74 files in the input queue before the MergeContent was run?
The mergeContent processor just like the other processors works on a run schedule. My guess is that last file was not in the queue at the moment the MergeContent processor ran, so you only saw 13 get bundled instead of 14. With a min of 4 entries, it will read what is on the queue and bin it. You likely ended up with 3 bins with 20 and 1 bin with 13 because at the moment it looked at the queue 73 or 13 FlowFiles is all it saw.
You can confirm this by stopping the MergeContent and allowing all 74 files to queue before staring it. The behavior should then be as you suspect.
Sounds like it is not important to have exactly 20 per merged file. Perhaps you can set a max bin age so that files don't get stuck.
Something else you can do is adjust the run schedule so the mergeContent does not run as often. The default is "o sec" which means run as fast as possible. Try changing that to somewhere between 1 and 10 sec to give the files a chance to queue. If you are picking up all the 74 files at the same time, we are likely talking milliseconds here that is causing this last file to get missed. Thanks, Matt
... View more
02:48 PM
The attached images do not really show us your complete configuration. Can you generate a template of your flow through the NiFi UI and share that? You create a template by highlighting/selecting all components you want to include in your template and then click on the "create template" icon in the upper center of the UI. After the template has been created you can export it out of your NiFi from the template management UI icon (upper right corner of UI). Then attach that exported xml template here.
... View more
01:50 PM
With a NiFi cluster, every node in that cluster runs the exact same dataflow. Some data ingest type processors are not ideally suited for this as they may complete or pull the same data in to each cluster node. In cases like this it is better to set the scheduling strategy on these processor to "On primary Node" so that the processor only runs on one node (primary node).
You can then use dataflow design strategies like RPGs (NiFi Site-to-Site) to redistribute the received data across all your NiFi cluster nodes for processing.
... View more