Support Questions

Find answers, ask questions, and share your expertise

How to configure NiFi processors (that interface with external services) in a Cluster environment

avatar
Expert Contributor

Hello,

We are about to move to a NiFi cluster environment from a Standalone NiFi instance; we did our dataflow development on the Standalone instance; looking at the NiFi Admin documentation, it appears that some of the processors we have in our dataflow need to be configured to work in a cluster environment;

From the admin guide...

"...In a NiFi cluster, the same dataflow runs on all the nodes. As a result, every component in the flow runs on every node. However, there may be cases when the DFM would not want every processor to run on every node.

...the GetSFTP processor pulls from a remote directory, and if the GetSFTP on every node in the cluster tries simultaneously to pull from the same remote directory, there could be race conditions. Therefore, the DFM could configure the GetSFTP on the Primary Node to run in isolation, meaning that it only runs on that node. It could pull in data and -with the proper dataflow configuration- load-balance it across the rest of the nodes in the cluster...."

We have GetSFTP and ListenTCP that fall into this case mentioned above; the verbiage above, from Admin guide, in bold, alludes to "with the proper dataflow configuration", the data from these processors can load balance across the nodes, but it didn't say what configuration is needed to be done; any suggestions or guidance is greatly appreciated.

I'm thinking that PutHDFS processor does not need any special configuration for a Cluster environment, since on each node, it can write to HDFS whatever data is sent to it (from the dataflow on that node) and no coordination is needed between the PutHDFS processors running on the various nodes. Is my assumption correct ?

I did find some guidance on how to configure PutEmail in a cluster environment.

Thanks in advance.

1 ACCEPTED SOLUTION

avatar

Hi @Raj B,

Regarding the processors you mention:

- GetSFTP: if you want to load balance the access to the SFTP server, then I'd recommend the use of ListSFTP running on the primary node and the use of FetchSFTP on all the nodes. This way the actual download of files will be load balanced on your cluster without concurrent access to the files.

- Regarding ListenTCP, it depends if you want to have all your nodes of your cluster listening on a given port for TCP connection (with a load balancer in front of NiFi for example) or if you want only one node to listen on that port. However, keep in mind, there is no way to ensure that a given node will be the primary node, and going that way (having a client connecting to a specific node) is not recommended on a HA point of view.

- PutHDFS is fine as long as two nodes are not writing to the same path in HDFS.

Finally, there is one important point to remember to load balance data after a processor configured running on the primary node. If you do nothing special, then all the flow files will remain on the primary node from the beginning to the end. If you want to load balance the data, you need to connect your input processors to a Remote Process Group that is pointing to itself (to the cluster). This way the data will be actually load balanced on the cluster. You will find an example at the end of this post: https://pierrevillard.com/2016/08/13/apache-nifi-1-0-0-cluster-setup

Hope this helps.

View solution in original post

17 REPLIES 17

avatar

avatar
Expert Contributor

@Pierre Villard I don't mean to beat the dead horse, but does ExecuteSQL processor also have the same issue as ListenX and GetX processors in a Cluster environment (the processor running on each node of the cluster and the need for coordination of which node will read which database records)

avatar

ExecuteSQL is not a passive processor (it does not wait for clients to send data), it is an active processor in charge of getting the data. As long as your remote DB accepts concurrent access/requests, then it's fine. When you retrieve data from a table and you want to load balance the queries, then have a look at GenerateTableFetch and QueryDatabaseTable processors.

avatar
Expert Contributor

thank @Pierre Villard, I'm sorry, I'm not sure if I understand it clearly; from what you said, it seems ExecuteSQL processor does not need any coordination between nodes as the DB that I'm querying takes care of the coordination (as the DB knows that the queries from the different nodes are in fact from the same job, so it won't send the same data to each node's query eventhough the same query/dataflow runs on each node);

so, if ExecuteSQL runs on each node and each node in return is getting different data, then we have the query's load being balanced across all nodes, right ? correct me if that thinking is wrong.

what do GenerateTableFetch and QueryDatabaseTable processors do that ExecuteSQL does NOT do in a Cluster environment. I'm not sure if I understand when you would use ExecuteSQL processor versus the other two.

Thanks again.

avatar

You're right. If you ensure that each ExecuteSQL takes care of its own part of data on each node of the cluster then you have load balancing. But if the same request is executed by all ExecuteSQL this won't be load balanced (unless your DB takes care of that for you).

GenerateTableFetch is meant to be executed on primary node to generate multiple flow files to retrieve all the data of a table, and each flow file will take care of one "page" of data. Then the flow files are distributed over the cluster and QueryDatabaseTable will actually fetch the data. But in this case you ensure that each flow file contains a query for different pieces of data of the same table. This way you won't duplicate data and you will have load balanced the queries.

ExecuteSQL can achieve the same kind of things, it really depends of your use case and how you define your queries.

avatar
Expert Contributor

@Pierre Villard thank you for your continued engagement in this conversation.

1) It sounds like by default ExecuteSQL does not provide coordination between nodes for a query, to ensure the query is not duplicated on nodes; and I, as a developer, have to ensure that no duplication occurs (but I don't know what it is that I need to do to make ExecuteSQL work correctly in a cluster); so, an alternative is the combination of GenerateTableFetch and QueryDatabaseTable processors, which work properly by making the GenerateTableFetch run as an "Isolated Processor" on the primary node and QueryDatabaseTable on all nodes; Is that a correct summary of what we discussed thus far ?

2) Also you noted in your last comment : "ExecuteSQL can achieve the same kind of things, it really depends of your use case and how you define your queries."

My use case is to query Oracle and SQL Server databases, both of which support parallelization of queries; how do I need to define my queries differently for ExecuteSQL to work in a cluster while querying a relational database that supports query parallelization.

3) Is there any documentation on which NiFi processors work differently in a cluster versus a single node environment and what modifications need to be made for the processors to work correctly in a Cluster. So far my understanding is ListenX , GetX, and ExecuteSQL processors work differently in a Cluster.

Again, thank you for your time and patience.

avatar

1) Correct

2) It is really tied to your use case. Let's say you want to get the content of a database table which contains a column "jobId" and that you have in input a file listing all the job IDs for which you want to retrieve the associated data. You could have a GetFile -> SplitText -> ExtractText to have one flow file per job ID with the job ID in an attribute. All of this would run on primary node. Then you distribute the flow file on the cluster with a RPG. And finally, you can use ExecuteSQL with a query containing "WHERE jobId = ${jobId}". This way you have concurrent queries against the table but each query is accessing its own data. Again it really depends of the use case and how you can split the queries.

3) Not that I am aware of. The main reason being it really depends of the use case, there is no general rule.

Hope this helps.

avatar
Expert Contributor

Thank you @Pierre Villard

I think I'm getting there... 🙂