Created on 03-01-2020 10:25 PM - last edited on 03-01-2020 10:33 PM by VidyaSargur
Hey Guys,
Need to establish links between dependent flows within the same NiFi cluster / canvas - e.g. one flow finishes ETL, triggering another flow to begin its ETL on the result of the first's.
From what I gather this could be achieved in 3 ways:
1. Direct connections between flows - undesirable in our production environment and data governance and a spider's web of dependencies
2. Remote processor groups (RPGs) - if can send / receive from same NiFi Cluster
3. HTTP
Could you please advise on options 2 & 3 as a solution for this? RPGs seem simpler than handling HTTP requests / responses yet require an input port on the top level of the flow, which would then need to be directly connected into each flow (being another version of option 1).
Thanks
Created 03-05-2020 10:21 AM
I see no issues with using a publish and consume design for triggering follow on flows.
That provide a slightly more robust setup than using the postHTTP to ListenHTTP example I provided since any NiFi node can consume the trigger file.
When using Kafka you will want to make sure you have the same number of partitions as you have consumer in the same consumer group. When you add a consumeKafka processor to the NiFi canvas it is really being added to every node in your cluster and is configured the same on every node.
Let's assume you have a 3 node NiFi cluster, that means you have at a minimum 3 consumers in that consumer group so your Kafka topic should have 3 partitions; otherwise, you may see a lot of rebalancing happen. To improve performance even more you can increase the concurrent task on your consumeKafka processor. With a 3 node NiFi cluster and a consumeKafka configured with 2 concurrent tasks, you now have 6 total consumers in the consumer group; therefore, your Kafka topic should have 6 partitions. If a NiFi node goes down Kafka will assign multiple partitions to the same consumer, so no need to worry about messages not being consumed.
Hope this information was useful in finding a solution,
Matt
Created 03-03-2020 01:48 PM
Option 2:
The good news is that as of Apache NiFi 1.10 you can create remote input and output ports at any process group level (https://issues.apache.org/jira/browse/NIFI-2933). This is probably your best option as it handles distribution across all available nodes in your cluster. IF a node goes down the Remote Process Group (RPG) will not only distribute FlowFiles to remaining nodes that are available.
Option 3:
Here you would use a PostHTTP processor and ListenHTTP processor. The downside to this option over option 2 is that the PostHTTP processor can only be configured to send to one URL endpoint. So if target NiFi node is down, it will fail to send. If course you could connect failure from one PostHTTP to another and so on adding as many postHTTP processors as you have nodes, but this does not scale well.
Hope this helps,
Matt
Created 03-03-2020 07:40 PM
Hi Matt,
Thanks for your response, it makes sense.
1.10 seems cool yet may take some time to get productionised for us. Perhaps having 1 POSTHttp for each node and round-robinning could work, but only if we assume that the nodes are fixed.
Or, is there a way to query available nodes within NiFi, so can set ${hostname} dynamically?
-- Dom
Created 03-04-2020 01:25 PM
Do you find yourself often adding or removing nodes from your NiFi cluster?
If not, creating one PostHTTP processor for each of your NiFi nodes would work.
As far as querying the connected nodes, sure that can be done via a call to the rest api, but you are going to get json back you are going to parse the connected hostname out of. Plus it is possible that post parsing that info one or more of those nodes disconnects. So the postHTTP would fail and you then need to do that parse processing all over again. This gets expensive in terms of process and performance. This kind of logic is built in to the Remote Process Groups.
However, we can assume that the NiFi node that is processing the FlowFile must be up or the FlowFile would not be getting processed. So you could just configure your postHTTP processor to use a dynamic URL based on a NiFi Expression Language statement that just use the hostname of the node on which the FlowFile is already being processed.
Example PostHTTP processor configuration:
Example ListenHTTP processor configuration:
The above allows you to send FlowFiles from a postHTTP to a ListenHTTP on the same node.
Note: Understand that you are adding additional read and writes to the content repository and sending data across the network which will add latency versus just routing the same FlowFile continuously within the canvas from one dataflow to another via directly connected dataflows.
Hope this solution helps you,
Matt
Created 03-04-2020 05:36 PM
Created 03-05-2020 10:21 AM
I see no issues with using a publish and consume design for triggering follow on flows.
That provide a slightly more robust setup than using the postHTTP to ListenHTTP example I provided since any NiFi node can consume the trigger file.
When using Kafka you will want to make sure you have the same number of partitions as you have consumer in the same consumer group. When you add a consumeKafka processor to the NiFi canvas it is really being added to every node in your cluster and is configured the same on every node.
Let's assume you have a 3 node NiFi cluster, that means you have at a minimum 3 consumers in that consumer group so your Kafka topic should have 3 partitions; otherwise, you may see a lot of rebalancing happen. To improve performance even more you can increase the concurrent task on your consumeKafka processor. With a 3 node NiFi cluster and a consumeKafka configured with 2 concurrent tasks, you now have 6 total consumers in the consumer group; therefore, your Kafka topic should have 6 partitions. If a NiFi node goes down Kafka will assign multiple partitions to the same consumer, so no need to worry about messages not being consumed.
Hope this information was useful in finding a solution,
Matt