Member since
07-30-2019
3471
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 143 | 06-03-2026 06:06 PM | |
| 458 | 05-06-2026 09:16 AM | |
| 821 | 05-04-2026 05:20 AM | |
| 493 | 05-01-2026 10:15 AM | |
| 620 | 03-23-2026 05:44 AM |
07-13-2017
12:42 PM
1 Kudo
@Akash S The ListHDFS processor records state so that only new files are listed. The processor also has a configuration option for recursing subdirectories. You could set the directory to only /MajorData/Location/ and let it list all files from the subdirectories. As new subdirectories are created, the files within those new directories will get listed. If that does not work for you, the NiFi expression language (EL) statement that you are looking for would look something like this for the directory: /MajorData/Location/${now():format('yyyy/MM/dd')} The above would cause Nifi to only look in the target directory fro Files until the day changed. I am not sure the rate at which files are written in to these target directories, but be mindful that if a file is add between runs of the listHDFS processor and the day changes between those runs, that file will not get listed using the above EL statement. Thanks, Matt
... View more
07-07-2017
01:16 PM
@Mark Heydenrych You may be able to use the ReplaceText processor to remove those blank lines from your input FlowFile's content before the SplitText processor. I did a little test that worked for me using the following configuration: This evaluates your FlowFile line by line and replace the line return (\n) on any line where the line starts with a line return with nothing. The effectively removes that blank line. After that my splitText reported teh correct fragment.count when I split the file. Thanks, Matt
... View more
07-06-2017
01:28 PM
@Mark Heydenrych I generated an Apache Jira requesting a change to this behavior: https://issues.apache.org/jira/browse/NIFI-4156 If you found this answer addressed your question, please mark answer as accepted. Thank you,
Matt
... View more
07-06-2017
12:37 PM
2 Kudos
@Mark Heydenrych The default configuration of the SplitText processor is to not emit FlowFiles where the content is just a blank line. This behavior is controlled by the "Remove trailing Newlines" property. The fragment.count attributes is set based on the total number of fragments in the original FlowFile's content. The Fragment.index is is a one up number assigned to each FlowFile emitted. So in your case, i suspect that your original FlowFile's content contained 66,443 lines with 13 of those lines as just blank lines that were not emitted. If you change "Remove trailing Newlines" to "false", your emitted count will match your Fragment.count. Thanks, Matt
... View more
06-26-2017
04:43 PM
7 Kudos
The NiFi S2S protocol is used by NiFi's Remote Process Group (RPG) components to distribute FlowFiles from one NiFi instance to another. When the target NiFi is a NiFi cluster, load-balancing of the FlowFie delivery is done across all nodes in the target NiFi cluster.
The default way this works (and the only way it works in versions of NiFi previous to Apache NiFi 1.2.0 or HDF 3.0) is as follows:
The RPG regularly communicates with the target NiFi cluster to get load status information about each node in the cluster. This information includes the number of currently connected nodes in the target cluster, each node's hostname, port information, and the number of total queued FlowFiles on each target NiFi node.
The Source NiFi uses this information to determine a data distribution strategy for its source FlowFiles it has queued. - - - Let's assume a 4 node target NiFi cluster all reporting a zero queue count. - Each node will then be scheduled to receive 25% of the data. This means a distribution pattern of node1, node2, node3, and then, node4. - Now let's assume the same 4 node target cluster; however, node 1 and node 2 report having a queue of FlowFiles that results in the following: - Node 1 and Node 2 would get 16.67% of the data while node 3 and node 4 get 33.33% of the data. This results in a distribution pattern of node1, node2, node3, node4, node3, and then node 4. So Nodes 3 and 4 get twice the opportunity to receive data over nodes 1 and 2.
Once the distribution pattern is determined, the RPG connects to the first node and starts transferring data from the incoming queue to that node for 500 milliseconds or until the queue is empty. The next run of RPG will start sending to the next node in pattern and so on.
As you can see by this default distribution model, the data may not always be distributed as desired. The reason this transfer was implemented this way was for performance reasons. However, when working with very small FlowFiles, where FlowFiles come in a wide range of sizes from small to large, when a better network connection exists between one target node than another, or data comes in bursts instead of continuous flow, the load-balancing will be less than ideal.
With the introduction of HDF 3.0 (Apache NiFi 1.2.0). additional configuration options were added to the RPG to control the number of FlowFiles (count), amount of data (size), and/or length of transaction time (duration) per RPG port connection. This gives users the ability to fine-tune their RPG connection to achieve better load-balancing results when dealing with lighter volume dataflows, network performance differences between nodes, etc.
These new configuration options can be set as follows:
Each input and output port configuration will need to be set individually.
Of course, setting count to a value of 1 sounds like a good way to achieve really good load-balancing, but it will cost you in performance since only one FlowFile will be sent in each transaction. So, there will be extra overhead introduced due to the volume of new connections being opened and closed. So you may find yourself playing around with these settings to achieve your desired load-balancing to performance ratio.
----------------
How do I get better load-balancing in an older version of NiFi?
The RPG will send data based on what is currently in the incoming queue per transaction. By limiting the size of that queue, you can control the max number of FlowFiles that will transfer per transaction. You can set the size of object back pressure thresholds on those incoming queues to limit the number of FlowFiles queued at any given time. This will cause FlowFiles to queue on the next upstream connection in the dataflow. If a source processor is feeding the RPG directly, try putting an updateAttribute processor between that source processor and the RPG so you have two connections. As each RPG execution runs and transfers what is on the queue, the queue will be refilled for the next transaction. Apache NiFi 1.13+ update: In newer releases of NiFi, the ability to redistribute FlowFiles within the cluster was made much more efficient and easier through the new load-balanced connection feature. This new feature (stable in Apache NIFi 1.13+ versions) is a simple configuration change that can be done on a connection. It supports numerous strategies for redistribution of FlowFiles, but for load-balanced distribution, it offers a true round-robin capability you can't get from an RPG.
... View more
Labels:
06-14-2017
01:59 PM
1 Kudo
@Thierry Vernhet With number 3, I am assuming that every file has a unique filename from which to determine if the same filename has ever been listed more then once. If that is not the case, then you would need to use detectDuplicate after fetching the actual data (less desirable since you will have wasted the resources to potential fetch the same files twice before deleting the duplicate. Let assume every file has a unique filename. If so the detect duplicate flow would look like this: with the DetectDuplicate configured as follows: You will also need to add two controller services to your NiFi: - DistributedMapCacheServer - DistributedMapCacheClientService The value associated to the "filename" attribute on the FlowFile is checked against entries in the DistributedMapCacheServer. If filename does not exist, it is added. If it exists already then FlowFile is routed to duplicate relationship. In scenario 2 where filenames may be reused we need to detect if the content after fetch is a duplicate or not. IN this case the flow may look like this: After fetching the content of a FlowFile, the "HashContent" processor is used to create a hash of the content and write it to a FlowFile attribute (default is hash.value). The detectDuplicate processor then configured to look for FlowFile with the same hash.value to determine if they are duplicates. FlowFiles where the content hash already exist in the distributedMapCacheServer, those FlowFile are routed to duplicate where you can delete them if you like. If you found this answer addressed your original question, please mark it as accepted by clicking under the answer. Thanks, Matt
... View more
06-14-2017
12:30 PM
3 Kudos
@Thierry Vernhet The ListFile processor will list all non-hidden file it sees in the target directory. It then will record the latest timestamp of batch of files it listed in state management. This timestamp is what is used to determine what new files to list in next run. Since the timestamp has changed, the same file will be listed again. A few suggestion in preferred order would be: 1. Change how files are being written to this directory. - The ListFile processor will ignore and hidden files. So File being written as ".myfile.txt" will be ignored until the filename has changed to just "myfile.txt". 2. Change the "Minimum File Age" setting on the processor to a high enough value to allows source system to complete file writes to this directory. 3. Add a detectDuplicate processor after your listFile processor to detect duplicate listed files and remove them from the your dataflow before the FetchFile processor. Thanks, Matt
... View more
06-14-2017
12:09 PM
@estefania rabadan There is no processor configuration option to turn off what attributes a processor writes on to a FlowFile it processes. However, you can use the UpdateAttribute processor to remove attributes from FlowFiles. Thanks, Matt
... View more
06-13-2017
04:31 PM
@forest lin NiFi at is core has no issues working with very large files. Often times, when you run into OOM it is because of what you are trying to do with those very large files after they are in NiFi. In the majority of the cases OOM can be avoided via dataflow design and tweaks to the heap size allocated to the NiFi JVM. The content of a FlowFile does not live in heap memory space, but the FlowFile attributes do (*** except when swapped out to disk in large queues). So avoid extracting large amounts of the content into FlowFile attributes, avoid trying to split very large files in to large numbers of small FlowFiles using a single processor, avoid trying to merge a very large number of FlowFiles in to a single FlowFile, etc... You can still do these types of things but may need to do it in two stages rather then one. For example Splitting large files by every 5000 lines first and then split 5000 line FlowFiles by every line (Huge difference in heap usage). If you found this answer addressed your question, please mark it as accepted to close out this thread. Thanks,
Matt
... View more
06-13-2017
12:49 PM
1 Kudo
@forest lin Backpressure is not used to control data rate in your dataflow. The intent of the backpressure setting on connections is to control the amount of allowed queued data. Both Back pressure settings are "soft" limits. Once backpressure kicks in on a connection, the processor feeding that connection will no longer be allowed to run. So in you case above, you have backpressure set to 5 Objects (FlowFiles) or 5 KB of content. Since your queue is empty, no backpressure was being applied when the 37.05 MB FlowFile arrived at your ConvertCSVToAvro processor, so that processor was allowed to run. That 1 FlowFile was processed through and placed on the outbound connection. It is at that time back pressure kicked in because you exceeded one of your backpressure settings. The ConvertCSVToAvro processor will now be prevented from running until that backpressure drops below 5 FlowFiles or 5 KB of queued data again. If all your processor are processing FlowFiles rapidly, back pressure will be very sparsely applied. Also keep in mind for efficiency some processors work on batches of FlowFiles. You may see for example with a backpressure object threshold of 5 a queue with more then 5 FlowFiles. The batch of FlowFiles are placed on an outbound queue. That processor who did the batch processing will then not be allowed to run again until that outbound connection drops again below 5 FlowFiles. The ControlRate processor allows you to actually control the throughput of a dataflow. It does not slow the processing. The ControlRate processor will allow data to queue in its input side and based on its configured setting only allow x number of FlowFiles through over y amount of time. lets say it is configured to let 5 KB of data through every 1 minute. If you feed it a 37 MB file, it does not transfer just pieces of that FlowFile. It will feed through the entire 37 MB FlowFile and then not allow another FlowFile through until the average data per 1 minute is 5 KB. Because of how the above works, data could continue to queue in front of ControlRate. This is where backpressure settings become important to stop upstream processor from running. You can set backpressure all the way upstream to your data ingest processors so they stop accepting new FlowFiles. Thanks, Matt
... View more