Member since
07-30-2019
3396
Posts
1619
Kudos Received
1001
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 420 | 11-05-2025 11:01 AM | |
| 325 | 11-05-2025 08:01 AM | |
| 456 | 11-04-2025 10:16 AM | |
| 673 | 10-20-2025 06:29 AM | |
| 813 | 10-10-2025 08:03 AM |
07-07-2017
01:16 PM
@Mark Heydenrych You may be able to use the ReplaceText processor to remove those blank lines from your input FlowFile's content before the SplitText processor. I did a little test that worked for me using the following configuration: This evaluates your FlowFile line by line and replace the line return (\n) on any line where the line starts with a line return with nothing. The effectively removes that blank line. After that my splitText reported teh correct fragment.count when I split the file. Thanks, Matt
... View more
05-07-2018
04:48 PM
@srinivas p *** Forum tip: Avoid responding to existing answers with a new answer. Instead use comments to correspond within a single answer. - That being said, your environment is very different from environment in this original question. Far fewer nodes. Are you running same version of HDF/NiFi? - - I actually recommend starting a new question with your environment specific details. You'll get my traction answer wise that way. - Thanks, Matt
... View more
08-11-2017
07:31 AM
2 Kudos
@Matt Clarke I have created a Jira ticket for this enhancement: https://issues.apache.org/jira/browse/NIFI-4284
... View more
07-05-2017
06:48 PM
@Bharadwaj Bhimavarapu General guidance here is these values should be set to 2 times the number of available cores and no more then 4 times the number of available cores on a single instance of NiFi. If you are running a NiFi cluster, these values are enforced per node. So a setting of 16 in a 4 node cluster equates to a total of 64 threads cross the cluster. Setting values to high just results in many more threads in cpu wait and will not help performance at all. Beyond increasing these value you need to be mindful of how many concurrent task you assign each of your processors. Some processor are more cpu intensive then others (meaning they take longer to complete a job holding the thread much longer). You can look at the "tasks/time =: stats on a processor to see if it thread are long or short running. For processors that have long running threads you want to be extra careful on how many concurrent tasks you assign them. Thanks, Matt
... View more
06-26-2017
04:43 PM
7 Kudos
The NiFi S2S protocol is used by NiFi's Remote Process Group (RPG) components to distribute FlowFiles from one NiFi instance to another. When the target NiFi is a NiFi cluster, load-balancing of the FlowFie delivery is done across all nodes in the target NiFi cluster.
The default way this works (and the only way it works in versions of NiFi previous to Apache NiFi 1.2.0 or HDF 3.0) is as follows:
The RPG regularly communicates with the target NiFi cluster to get load status information about each node in the cluster. This information includes the number of currently connected nodes in the target cluster, each node's hostname, port information, and the number of total queued FlowFiles on each target NiFi node.
The Source NiFi uses this information to determine a data distribution strategy for its source FlowFiles it has queued. - - - Let's assume a 4 node target NiFi cluster all reporting a zero queue count. - Each node will then be scheduled to receive 25% of the data. This means a distribution pattern of node1, node2, node3, and then, node4. - Now let's assume the same 4 node target cluster; however, node 1 and node 2 report having a queue of FlowFiles that results in the following: - Node 1 and Node 2 would get 16.67% of the data while node 3 and node 4 get 33.33% of the data. This results in a distribution pattern of node1, node2, node3, node4, node3, and then node 4. So Nodes 3 and 4 get twice the opportunity to receive data over nodes 1 and 2.
Once the distribution pattern is determined, the RPG connects to the first node and starts transferring data from the incoming queue to that node for 500 milliseconds or until the queue is empty. The next run of RPG will start sending to the next node in pattern and so on.
As you can see by this default distribution model, the data may not always be distributed as desired. The reason this transfer was implemented this way was for performance reasons. However, when working with very small FlowFiles, where FlowFiles come in a wide range of sizes from small to large, when a better network connection exists between one target node than another, or data comes in bursts instead of continuous flow, the load-balancing will be less than ideal.
With the introduction of HDF 3.0 (Apache NiFi 1.2.0). additional configuration options were added to the RPG to control the number of FlowFiles (count), amount of data (size), and/or length of transaction time (duration) per RPG port connection. This gives users the ability to fine-tune their RPG connection to achieve better load-balancing results when dealing with lighter volume dataflows, network performance differences between nodes, etc.
These new configuration options can be set as follows:
Each input and output port configuration will need to be set individually.
Of course, setting count to a value of 1 sounds like a good way to achieve really good load-balancing, but it will cost you in performance since only one FlowFile will be sent in each transaction. So, there will be extra overhead introduced due to the volume of new connections being opened and closed. So you may find yourself playing around with these settings to achieve your desired load-balancing to performance ratio.
----------------
How do I get better load-balancing in an older version of NiFi?
The RPG will send data based on what is currently in the incoming queue per transaction. By limiting the size of that queue, you can control the max number of FlowFiles that will transfer per transaction. You can set the size of object back pressure thresholds on those incoming queues to limit the number of FlowFiles queued at any given time. This will cause FlowFiles to queue on the next upstream connection in the dataflow. If a source processor is feeding the RPG directly, try putting an updateAttribute processor between that source processor and the RPG so you have two connections. As each RPG execution runs and transfers what is on the queue, the queue will be refilled for the next transaction. Apache NiFi 1.13+ update: In newer releases of NiFi, the ability to redistribute FlowFiles within the cluster was made much more efficient and easier through the new load-balanced connection feature. This new feature (stable in Apache NIFi 1.13+ versions) is a simple configuration change that can be done on a connection. It supports numerous strategies for redistribution of FlowFiles, but for load-balanced distribution, it offers a true round-robin capability you can't get from an RPG.
... View more
Labels:
12-18-2017
02:00 PM
Thank You Matt, I too was facing similar issue and your suggestion worked.
... View more
08-13-2019
12:25 PM
Hello @Matt Burgess . I am using the CaptureChangesMysql processor to read the events data from mysql but it it failing with the connection error as "org.apache.nifi.reporting.initializationexception can't load database driver" screenshots are attached for the reference . Mysql Driver path: file:////Users/kiran/Downloads/mysql/mysql-connector-java-8.0.17.jar Environment : Aapache nifi :1.7.0 Mysql 8.0.17 OS : MacOS 10.14.4
... View more
06-15-2017
01:07 PM
@Prakash Ravi Nodes in a NiFi cluster have no idea about the existence of other nodes in the cluster. Nodes simply send heath and status heartbeat messages to the currently elected cluster coordinator. As such, each node runs its own copy of the flow.xml.gz file and works on its own set of FlowFiles. So if you have 9 NiFi nodes, each node will be running its own copy of the consumeKafka processor. With 1 concurrent task set on the processor, each node will establish one consumer connection to the Kafka topic. So you would have 9 consumers for 10 partitions. So in order to consume from all partitions you will need to configure 2 concurrent tasks. This will give you 18 consumers for 10 partitions. Kafka will assign a partition connections within this pool of 18 consumers. Ideally you would see 1 consumer on 8 of your nodes and 2 on one. The data to your niFi cluster will not be evenly balanced because of the in-balance in number of consumers versus partitions. As far as your Kafka Broker rebalance goes.... Kafka will trigger a rebalance if a consumer disconnects and another consumer connects. Things that can cause a consumer to disconnect include: 1. Shutting down one or more of your NiFi nodes. 2. Connection timeout between a consumer and a Kafka broker. - Triggered by network issues between a NiFi node and Kafka broker - Triggered by scheduling Consume Kafka run schedule for longer then configured timeout. for example a 60 second run schedule and 30 second timeout. - Triggered by backpressure being applied on the connection leading off the consumeKafka causing ConsumeKafka to not run until backpressure is gone. *** This trigger was fixed in NiFi 1.2, but i don't knwo what version you are running. I you feel I have addressed your original question, please mark this answer as accepted ( ) to close out this thread. Thank you, Matt
... View more
06-13-2017
04:31 PM
@forest lin NiFi at is core has no issues working with very large files. Often times, when you run into OOM it is because of what you are trying to do with those very large files after they are in NiFi. In the majority of the cases OOM can be avoided via dataflow design and tweaks to the heap size allocated to the NiFi JVM. The content of a FlowFile does not live in heap memory space, but the FlowFile attributes do (*** except when swapped out to disk in large queues). So avoid extracting large amounts of the content into FlowFile attributes, avoid trying to split very large files in to large numbers of small FlowFiles using a single processor, avoid trying to merge a very large number of FlowFiles in to a single FlowFile, etc... You can still do these types of things but may need to do it in two stages rather then one. For example Splitting large files by every 5000 lines first and then split 5000 line FlowFiles by every line (Huge difference in heap usage). If you found this answer addressed your question, please mark it as accepted to close out this thread. Thanks,
Matt
... View more
06-08-2017
04:21 PM
@Matt Clarke Thanks, that answer my question. I have to move the contents of the flowFile B to its attributes, then fetch file C and parse it with more processors to set more attributes on the flowfile and finally restore the contents of the flowfile to what I saved to the attributes. Sound's like I'll go with a scripted processor instead, that will save a lot of headache for me 🙂
... View more