Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Load balancing while the fetching of file from a standalone NIFI instance doing the processing it in a NIFI cluster.

avatar
Explorer

We have a NIFI setup where in we have a NIFI cluster installed in a hadoop cluster and a standalone NIFI instance running on another server.The input file will be generated in the file system of the server of the standalone instance.We are fetching the file using ListFile/FetchFile processor in the standalone instance.Then in the main cluster we are connecting to the standalone instance using RPG group and then send the output to the NIFI cluster(RPG) using site-site.As per my understanding the part of the processing done inside the cluster will be distributed.Is this understanding correct?I also would like to know if there is a way to distribute the fetching of the source file that we are doing in the standalone NIFI instance?

The flow we are using

In the standalone instance

ListFile->FetchFile->Outputport

In the NCM of the cluster

RPG(Standalone NIFI Instance)-->RPG(NIFI cluster)

Inputport->Routetext->PutHDFS(This is the processing done in the main cluster)

Let me know if you need any other information.Any inputs will be appretiated.

Regards,

Indranil Roy

1 ACCEPTED SOLUTION

avatar
Mentor
@INDRANIL ROY

The output from the SplitText and RouteText processors is a bunch of FlowFiles all with the same filename (filename of the original FlowFile they were derived from.) NiFi differentiates these FlowFiles by assigning each a Unique Identifier (UUID). The problem you have is then writing to HDFS only the first FlowFile written with a particular filename is successful. All others result in the error you are seeing. The MergeContent processor you added reduces the impact but does not solve your problem. Remember that nodes do not talk to one another or share files with one another. So each MergeContent is working on its own set of files all derived from the same original source file and each node is producing its own merged file with the same filename. The first node to successfully write its file HDFS wins and the other nodes throw the error you are seeing. What is typically done here is to add an UpdateAttribute processor after each of your MergeContent processor to force a unique name on each of the FlowFiles before writing to HDFS.

The uuid that NiFi assigns to each of these FlowFiles is often prepended or appended to the filename to solve this problem:

7401-screen-shot-2016-09-06-at-124041-pm.png

If you do not want to merge the FlowFiles, you can simply just add the UpdateAttribute processor in its place. YOu will just end up with a larger number of files written to HDFS.

Thanks,

Matt

View solution in original post

21 REPLIES 21

avatar

Whenever you do site-to-site, it will automatically do load-balancing for you... So if you have your standalone doing ListFile -> FetchFile -> OutputPort then on your cluster, all you need to do is have a RPG pointing back to the Output Port on the standalone instance, there will be an instance of this RPG on each node of your cluster and each one will pull from the Output port. No need to do another internal RPG.

See the site-to-site section here:

https://community.hortonworks.com/articles/16120/how-do-i-distribute-data-across-a-nifi-cluster.html

You can't really distribute the fetching of the source file if it is coming from the local filesystem. The only thing that can fetch that file is something with access to the local filesystem, which is your standalone NiFI. If it was a shared filesystem then you could.

avatar
Explorer

Thanks @Bryan Bende for your input

As per your suggestion I have included two different flows as given below:

Flow1

=====

In the standalone instance

ListFile->FetchFile->Outputport

In the NCM of the cluster

RPG(Standalone NIFI Instance)-->Routetext->PutHDFS(This is the processing done in the main cluster)

Flow2

=====

In the standalone instance

ListFile->FetchFile->Input port of RPG(NIFI Cluster URL of the NCM)

In the NCM of the cluster

Input port->RouteText->OutputPort

Which according to you is the correct flow.I can understand that the fetching of the source file cannot be distributed if the file is not shared but which of the flows will be apt to distribute the part of the processing done inside the cluster?

Regards,

Indranil Roy

avatar

Both of those flows should have the data distributed in the cluster...

Flow #1 is a pull model where all the nodes in the cluster will pull data from the standalone.

Flow #2 is a push model where the standalone will push the data to all the nodes in the cluster.

Either approach is correct, but I tend to lean towards the push model (#2) since it lets the source of the data (the standalone instance) decide where to send the data.

avatar
Mentor
@INDRANIL ROY

What you describe is a very common dataflow design.

I have a couple question for clarity.

RPG (Remote Process Group) do not send to other RPGs. RPG send and pull data from input and output ports located on other NiFi instances. I suspect your standalone instance has the RPG and it is sending FlowFiles to input port(s) on the destination NiFi cluster. In this particular case the load-balancing of data is being handled by the RPG. For network efficiency data is distributed in batches, so you may not see with light dataflows and exact same number of FlowFiles going to each Node. Also the Load-balancing has logic built in to it so that Node in the target cluster who have less work load get more FlowFiles. Although the URL provided to the RPG is the URL for the target BNiFi cluster's NCM, the FlowFiles are not sent to the NCM, but rather sent directly to the connected nodes in the target cluster.

Every Node in a NiFi cluster operates independently of one another working only on the FlowFiles it possesses. Nodes do not communicate with one another. They simply report their health and status back to the NCM. It is information from those health and status heartbeats that is sent back to the source RPG and used by that RPG to do the smart data delivery.

In order to distribute the fetching of the source data, the source directory would need to be reachable by all nodes in the target NiFi cluster. In the case of ListFile/FetchFile, the directory would need to mounted identically to all systems.

Another option would be to switch to a listSFTP/FetchSFTP setup. In this setup you would not even need your standalone NiFi install. You could simply add a listSFTP processor to your cluster (configured to run "on primary node"). Then take the success from that listing and feed it to an RPG that points back at the clusters NCM URL. An input port would be used to receive the now load-balanced FlowFiles. Feed the success from that input port to the FetchSFTP processor and now you have all nodes in your cluster retrieving the actual content.

6927-screen-shot-2016-08-24-at-101420-am.png

So as you can see from the above the listSFTP would only run on one node (Primary Node) producing no content FlowFiles. The RPG would smartly distribute those FlowFile across all connected nodes where the FetchSFTP on each Node would retrieve the actual content.

The same flow above could be done with listFile and FetchFile as well, just mount the same source directory to every node and follow the same model.

Matt

avatar
Explorer

Hi @mclark Thanks for the alternate approach that you suggested.It could be helpful in my case.

Say in the scenario mentioned above we have a single input file of size in the order of TB's.If we use a ListSFTP/FetchSFTP processor in the way you mentioned to distribute the fetching of data:

  • Do we need to establish a SFTP channel between every slave node of the cluster and the remote server that houses the source file for this approach to work?
  • Is it a good idea to use SFTP to fetch the file considering the size of the file will be in TB's?
  • What are the parameters on which the the performance of the fetch using ListSFTP/FetchSFTP will depend?

avatar
Mentor

@INDRANIL ROY

The massive size of your file, ListSFTP/FetchSFTP may not be the best approach. Let me ask a few questions:

1. Are you picking up numerous files of this multi-TB size or are we talking about a single file?

2. Are you trying to send the same TB file to every Node in your cluster or is each node going to receive a completely different file?

3. Is the directory where these files are originally consumed from a local disk or a network mounted disk?

avatar
Explorer

@mclark

1)We are talking about a single file in TB.

2)There is a single file and the processing should be distributed.

3)The file are in the local directory.

So is it a good idea?

avatar
Mentor
@INDRANIL ROY

NiFi does not distribute processing of a single file across multiple Nodes in a NiFi cluster. Each Node works on its own set of files. The Nodes themselves are not even aware other nodes exist. They work on what files they have and report their health and status back to the NiFI Cluster Manager (NCM).

1. What format is this file in?

2. What kind of processing are you trying to do against this files content?

3. Can the file be split in to numerous smaller files (Depending on the file content, NiFi may be able to do the splitting)?

As an example:

A common dataflow involves processing very large log files. The large log file is processed by the SplitText processor to produce many smaller files. These smaller files are then distributed across a cluster of NiFi nodes where the remainder of the processing is performed.

There are a variety of pre-existing "split" type processors.

Thanks,

Matt

avatar
Contributor

To your questions:

  1. "Will the processing be distributed" - Yes, the incoming flow of data will be evenly distributed to all available NiFi instances in the NiFi cluster. NCM will act as load balancer
  2. "Distribute the fetching of the source file" - could you elaborate on what you mean by this? In your existing example the Standalone instance is the only one which has access to its local filesystem. How would you prefer to distribute this?

avatar
Mentor

Just to clarify on how S2S works when communicating with a target NiFi cluster. The NCM never receives any data so it cannot act as the load-balancer. When the source NiFi communicates with the NCM, the NCM returns a list of all currently connected nodes and there S2S ports along with the current load on each node to the source NiFi. It is then the job of the source NiFi RPG to use that information to do a smart load-balanced delivery of data to those nodes.

avatar
Contributor

Appreciate the correction 😉

avatar
Explorer

@mclark

We have a single large(in TB's) flowfile coming to a standalone node.We want to distribute the processing.Is it a good approach to split the file into multiple smaller files using SplitText processor so that the processing is distributed to the remaining clusters.

In such a case we are considering the the flow given below:

In the NCM of the cluster

input->RouteText->PutHDFS

In the standalone processor that has the incoming flow file

ListFile->FetchFile->SplitText->UpdateAttribute->RPG(NCM url)

Does this set up ensure the processing to be distributed?

avatar
Mentor

@INDRANIL ROY That is the exact approach I suggested in response to the above thread we had going. Each Node will only work on the FlowFile it has in its possession. By splitting this large TB file into many smaller files, you can distribute the processing load across your downstream cluster. The distribution of FlowFiles via the RPG works as follows. The RPG communicates with the NCM of your NiFi cluster. The NCM returns back to the source RPG a list of available Nodes and there S2S ports in its cluster along with the current load on each. It is then the responsibility of the RPG to do smart load-balancing of the data in its incoming queue to these Nodes. Nodes with higher load will get fewer FlowFiles. The load balancing is done in batches for efficiency, so under light load you may not see an exact balanced delivery, but under higher FlowFile volumes you will see a balanced delivery over the 5 minutes delivery statistics.

Thanks,

Matt

avatar
Explorer

Thanks @mclark for your input.

As suggested we implemented the data flow as shown below:

1)The standalone flow(This is where the single source file arrive).The RPG in this flow refers to the cluster i.e NCM url

7296-mr7gf.png

2)The NCM of the cluster has flow as shown below:

7297-ts94p.png

3)In this approach were facing the error as shown below as getting only a subset of the records in HDFS

7300-afiuy.png

4)So to avoid the situation we used a MergeContent processor to merge the flowfiles since we were splitting them before loading them to HDFS

7293-scw3q.png

5)We configured the MergeContent in the way as shown below:

7294-ckmri.png

But even after this implementation we are not getting all the records in HDFS.

The source file has 10000000 records and approximately 5000000 records should go to each HDFS directory. But we are getting around 1000000 records in each Target and the error as shown below in the PutHDFS processors.

We are getting the same error as mentioned in the snapshot attached with the point 3 above.

Are we missing something very intrinsic here? Is there something wrong with the design?

We are using a 3 node cluster with NCM and 2 slave nodes. And the source file is coming to a standalone server.

Let me know if you need any other information. Any inputs would be appreciated.

Regards,

Indranil Roy

avatar
Mentor

@INDRANIL ROY

Please share how you have your SplitText and RouteText processors configuration.

If understand your end goal, you want to take this single files with 10,000,000 entries/lines and route only lines meeting criteria 1 to one putHDFS while route all other lines to another putHDFS?

Thanks,

Matt

avatar
Explorer

@mclark

Our RouteText processor is configured as shown below

7231-condition.png

as it shows, records with (first field is record number) number <= 5000000 goes to one direction and records number >= 5100000 goes to another.

The split Text processor is configured with the below properties:

7234-split-config-final.png

Just to give an overview of our requirement:

1)We have a single file as source coming to a standalone server.

2)We fetch the file and then split it into multiple files and then send to the cluster in order to distribute the processing to all the nodes of the cluster.

3)In the cluster we route the files based on condition so that records with (first field is record number) number <= 5000000 goes to one output directory in HDFS and records number >= 5100000 goes to another output directory in HDFS as mentioned in the two putHDFS processors.

4)But after executing the process we have around 1000000 records in each output directory whereas ideally we should have 5000000 records approximately in either of the HDFS directory.

Also we are getting below error in the PutHDFS processors

7300-afiuy.png

Please let me know if you need any further information.

Just to add to the above set up works perfectly fine when we are using a standalone node and we are using putFile instead of putHDFS to output the files to a local path instead of hadoop.

Regards,

Indranil Roy

avatar
Mentor
@INDRANIL ROY

Your approach above looks good except you really want to split that large 50,000,000 line file in to many more smaller files. Your example shows you only splitting it in to 10 files which may not ensure good file distribution to the downstream NiFi cluster nodes. The RPG load balances batches of files (up to 100 at a time) for speed and efficiency purposes. With so few files it is likely that every file will still end up on the same downstream node instead of load balanced. However if you were to split the source file in to ~5,000 files, you would achieve much better load-balancing.

Thanks,

Matt

avatar
Explorer

@mclark

Sure I will try that option.I can understand that I need to increase my split count in order to achieve better load balancing.But if I go back to the main issue I pointed in the thread above was that apart from the performance aspect we were getting only a subset of records in HDFS. It seems the process was trying to create the same file and overwrite it multiple times hence giving an error as shown below. When I use splitText processor and send it to the RPG and then merge it I am getting the error as shown(attached).

7300-afiuy.png

Just to be on the same page here my flow looks like below:

In the NCM of the cluster

flow.png

In the standalone cluster

flow1.png

Does increasing the splitCount solve this problem?

Also is it necessary to use a MergeContent/UpdateAttribute if we use a splitText?Can't we achieve this flow without using the MergeContent/UpdateAttribute processor in the RPG?

Regards,

Indranil Roy

avatar
Mentor
@INDRANIL ROY

The output from the SplitText and RouteText processors is a bunch of FlowFiles all with the same filename (filename of the original FlowFile they were derived from.) NiFi differentiates these FlowFiles by assigning each a Unique Identifier (UUID). The problem you have is then writing to HDFS only the first FlowFile written with a particular filename is successful. All others result in the error you are seeing. The MergeContent processor you added reduces the impact but does not solve your problem. Remember that nodes do not talk to one another or share files with one another. So each MergeContent is working on its own set of files all derived from the same original source file and each node is producing its own merged file with the same filename. The first node to successfully write its file HDFS wins and the other nodes throw the error you are seeing. What is typically done here is to add an UpdateAttribute processor after each of your MergeContent processor to force a unique name on each of the FlowFiles before writing to HDFS.

The uuid that NiFi assigns to each of these FlowFiles is often prepended or appended to the filename to solve this problem:

7401-screen-shot-2016-09-06-at-124041-pm.png

If you do not want to merge the FlowFiles, you can simply just add the UpdateAttribute processor in its place. YOu will just end up with a larger number of files written to HDFS.

Thanks,

Matt

avatar
Explorer

@mclark

Thanks for your inputs.The above solution worked perfectly fine in my case both in terms of the error and performance.But as you already mentioned above in this situation we have a large number of files in the HDFS. Even if I use a MergeContent processor in the flow I am getting more than I files.For what I can understand by looking at the provenence the MergeContent processor is merging files in block.Say we have 100 flow files coming to the MergeContent processor batches of 30,30,20,20.If will not wait for 100 files and generate 4 output files by merging in groups.Is there a way by which we can control this behavior and enforce it to produce only 1 output files for each output path.

mergecontent.png

This is the configuration of MergeContent processor.Any inputs will be very helpful.

Regards,

Indranil Roy

Labels