Member since
07-30-2019
3470
Posts
1641
Kudos Received
1018
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 246 | 05-06-2026 09:16 AM | |
| 433 | 05-04-2026 05:20 AM | |
| 314 | 05-01-2026 10:15 AM | |
| 505 | 03-23-2026 05:44 AM | |
| 385 | 02-18-2026 09:59 AM |
05-23-2024
05:44 AM
@Sofia71 Sharing the specific processors used in your dataflow and the configuration of some of them to include your MergeContent processor configuration may help clarify some your specific setup. You are using a ConsumeKafka processor to consume messages (multi-record content) from a Kafka topic. I am assuming that each of these consumed messages only contains two single records so the the SplitRecord processor only produces 2 FlowFiles for every FlowFile it splits? Immediately after SplitRecord processors you have configured the "Round Robin" load -balancing strategy on the outbound connection. This probably going to be your first issue here. Each node in a NiFi cluster runs its own identical copy of the flow against only the FlowFiles present on that one specific node. Each node has not access to or ability read FlowFiles present on other nodes. So if 1 flowFile produced by splitting of a record is on node1 and the other FlowFile is on Node2, the downstream MergeContent is not going to be able to merge them back together. So first question is whether you even need to setup load-balancing on the connection since you are consuming your messages from a Kafka topic. 1. How many nodes in your NiFi Cluster? 2. How many partitions on the Kafka topic from which you are consuming? The consumeKafka processor uses a "Group ID" to identify a consumer group. so Every node in your NiFi Cluster that is running this consumeKafka processor is a member of the same consumer group. So lets assume your source Kafka Topic has 3 partitions and your NiFi cluster has 3 nodes. What would happen here is each node's consumeKafka is assigned to one of those partitions. This means that each node is consuming a uniques set of messages from the topic. So no need to then load balance. Assuming above is not what you are doing, then the proper load-balancing strategy to use would be "Partition by attribute" which use an attribute on the FlowFile to make sure that FlowFiles with the same attribute value get sent to same node. Now on to the MergeContent processor. MergeContent upon execution reads from the inbound connection queue and starts assigning FlowFiles to bins. It does not search the inbound connection for matches. It simply reads in order listed and works its way down the list. First FlowFiles is allocated to a bin, then next FlowFile is if can't be allocated to same bin is placed in second bin, and so on. If a flowFile is allocated to every bin and then the next FlowFiles does not belong to any of those bins, MergeContent force merges the oldest bin to free up a bin for the new FlowFile allocation. There is no way to change how this works as the processor is not designed to parse through all connection FlowFiles looking for matches before allocating to bins. That would not exhibit very good performance characteristics. What is your concern with increasing number of bins? This might be a use case for wait/notify processors. So after you split the record in to two FlowFiles, one FlowFiles is currently routed to an invokeHTTP for further attribute enrichment and the other FlowFile ie routed directly to MergeContent? If so, so this means that the FlowFiles that don't get additional processing will queue up much sooner at MergeContent. But if you add a Notify processor after invokeHTTP processor and the Wait processor in the other FlowFile path before MergeContent you could control the release of FlowFiles to the mergeContent processor. This is just one suggestion you could try, but i would start first by making sure you are handling the distribution of your split FlowFiles correctly. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-23-2024
04:39 AM
@alan18080 Please provide more details around the steps you are performing and the exact versions of Apache NiFi and Apache NiFi-Registry you are using. So you have an existing NiFi-Registry configured to use Postgres as its metadata database. You have already version controlled some NiFi process group(s) to buckets within this registry. And now you are trying to export those version controlled process groups from this NiFi-Registry and import into another NiFi-Registry? What version are the two NiFi-Registries? What steps did you perform that eventually resulted in the exception encountered? I am not clear on what you are doing when you say "transfer". Sharing the complete error and stack trace can also be helpful. Thank you, Matt
... View more
05-21-2024
12:19 PM
@Racketmojster NiFi passes FlowFiles from component processor to component processor. A directory is nothing more than metadata and not actual content in itself. It is a path leading to hopefully data. So there is no NiFi processor specifically for generating a FlowFile based off a discovered parent directory path. But like I said in original my original post, a dataflow consisting of: ListFile --> UpdateAttribute (to extract date from absolute.path FlowFile attribute to new attribute for example "date.path") --> DetectDuplicate (configured with "${date.path}" as "Cache Entry Identifier" value) --> ReplaceText (optional: only needed if content of FlowFile needs to have directory path in it to pass to your script. Ideally this is not needed if you can just pass the FlowFile ${data.path} attribute value to your script. No idea what input is expected for your script) --> ExecuteStreamCommand So while the flow may list all files from the same date path, we will de duplicate so only one FlowFile from each unique date path is passed on to your executeStreamCommand processor so it can then execute your script agains that source directory. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-21-2024
11:33 AM
@Racketmojster So you do want to ingest all the files within a specific parent directory path. Use case details are important here. I was not sure if perhaps your script fetched the files when passed a folder date string. There are certainly challenges in with using MergeContent processor which may require modifications to your script to make it work. MergeContent provides numerous Merge Formats for how you want this processor to merge the content from multiple source FlowFiles: The default is binary concatenation which just appends the bytes from one FlowFile to the end of the previous FlowFiles content. You could specify a delimiter to keep track of filename of each section of bytes and to mark where one file's bytes start and end. This would require your script to parse this binary concatenated content and split the files by delimiter and obtain the individual content's filenames from the delimiter. This is rather messy. You could use ZIP or TAR formats to merge the files in to a TAR or ZIP files that retains the directory structure and filenames. This format would require that your script can then untar or unzip the content in order to process the individual files within the bundle. This is a less messy option. Once you have decided on the merge format, you need to configure the MergeContent to make sure all the files from within a unique parent directory (unique date directory) are merged with one another. This is where the "Correlation Attribute Name" setting is used. All source FlowFiles where the value from the configured FlowFile attribute is identical will be placed in same MergeContent bin. For example, you might use "absolute.path" in this property, but you'll need to make sure this will work in your use case since i don't know what your complete source structure tree looks like. You could also use an UpdateAttribute processor before MergeContent to create a new FlowFIle attribute with just the extracted date string from absolute.path and then use that new FlowFile attribute as your correlation attribute setting. Now you need to make sure MergeContent does not merge a bin before all expected FlowFiles have been added to bin. The way MergeContent processor works is that when executed it reads FlowFile(s) from inbound connection and allocates it/them to a bin. After allocating that FlowFile to a bin, it checks to see if that bin is eligible to be merged. Since NiFi executes processor by default as fast as possible (milliseconds count here), it is possible at the very moment it looks at the inbound connection not all FlowFiles from a specific directory may be present yet. Determining if a bin should be merged is controlled by the "Minimum Number of Entries", "Minimum Group Size", and "Max Bin Age" configuration properties. If both min settings are satisfied a bin will be merged. If both mins have not been met, but the max bin age (age of bin since first FlowFile was allocated to bin) has been reached or exceeded, the bin will be merged. With the defaults you can see how it is very easy that a bin may get merged before all necessary FlowFiles have been added to it. The "Maximum number of Bins" is also very important here. If you have 5 bins (default) and you have 20 source dated directories. it becomes possible that FlowFiles get allocated by that correlation attribute to 5 different bins, and then another FlowFile is processed that belong to none of those 5 bins (6th dated directory). What happens in this scenario is that MergeContent forces the merge of the oldest bin regardless or mins or max bin age in order to free a bin to allocated the next FlowFile to. If all your source unique dated directories contain same number of files, setting the min number entries is easy, but that is probably not the case. So what a user would typically do is set mins num entries to a value larger then any source directory would ever have to prevent bin from merging until the max bin age is reached. This introduces the latency in your dataflow by that configured max bin age. Hope this helps you understand the MergeContent processor configuration options better within the context of your specific dataflow needs. The above does not to me seem like an efficient dataflow. ListFile is listing files from the local filesystem, the fetchFile fetches the content for those listed files and adds it to the FlowFile. Then if you use the MergeContent to create a tar or zip, your script is going to need to untar or unzip that bundle somewhere to process the files. So you are effectively reading content rom a directory, which means writing content to NiFi's content repository, then tar/zipping which is another write to NiFi's content repository, then your script is untar/unzip that files (another write to somewhere local) in order to process the files. That is why in my original response i suggested avoiding the FetchFile and only use the ListFile to get the unique date source directory and pass that absolute path to your script that processes the files directly our of the source path. Reduces latency and a lot of disk IO. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-21-2024
10:38 AM
@udayAle Please start a new community question for your unrelated follow-up question above. Responses to an unrelated question will lead to confusion to other community members who may be having similar problems. You can use @<username> to notify specific people about new community questions. Thank you, Matt
... View more
05-20-2024
02:18 PM
2 Kudos
@manishg How many cpu cores does each of your NiFi hosts have? 1 means you are using 100% of 1 cpu on average. 20 means you are using 100% of 20 cores on average. etc... so lets say your node has 8 cores but your load average is higher then 8, this means your cpu is saturated and being asked to perform more work then can be handled efficiently. This leads to long thread execution times and can interfere with timely heartbeats being sent by nodes or processed by the elected cluster coordinator. Often times this is triggered by too many concurrent tasks on high CPU usage processors, high FlowFile volume, etc. You can ultimately design a dataflow that simply needs more CPU then you have to work at the throughput you need. User commonly just start configuring more and more concurrent tasks and set the Max Timer Driven thread pool way to high for the number of cores available on a node. This allows more threads to execute concurrently, but just results in each thread taking longer to complete as their time is sliced on the CPU. thread 1 gets some time on CPU 1 and then goes to time wait as another thread gets some time, eventually thread 1 will get a bit more time. More millisecond threads that is not a big deal, but for CPU intensive processors it can cause issues. Lets say you have numerous CPU intensive thread executing at same time, and the heartbeat is scheduled. the scheduled thread is now waiting in line for time on the CPU. Sometimes Alternate dataflow design can be used that use less CPU. Sometimes you can add more nodes. Sometimes you need to move some dataflows to different cluster. Sometimes you just need more CPU. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-20-2024
02:01 PM
1 Kudo
Hello @hegdemahendra Always very helpful if you include the exact version of Apache NiFI, Cloudera HDF, or Cloudera CFM being used. My guess here would be one or both of the following: You have multiple FlowFiles all pointing at the same content claims queued in connections within your dataflow(s) on the canvas. As long as a FlowFile exists on the canvas it will exist in flowfile_repository. Users should avoid leaving FlowFiles queued in connection on NiFi. Some users tend to allow FlowFile to accumulate at stopped processor components rather then auto-terminate them. Even if a FlowFile does not have any content its FlowFile attributes/metadata still consume disk space. You are extracting content from your FlowFiles into FlowFile attributes resulting in large FlowFile attribute/metadata being stored in the flowfile_repository. Dataflow designers should avoid extracting large amounts flowfile content in to the FlowFile's attributes. Instead try to build dataflows and utilize components that read content from the FlowFile's content instead of from FlowFile attributes. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-20-2024
01:38 PM
@galt @RAGHUY Let me add some correction/clarity to the accepted solution. Export and Modify Flow Configuration: Export the NiFi flow configuration, typically in XML format. This can be done via the NiFi UI or by utilizing NiFi's REST API. Then, manually adjust the XML to change the ID of the connection to the desired value. It is not clear here what is being done. The only way to export a flow configuration from NiFi in XML format is via generating a NiFi template (deprecated and removed in Apache NIFi 2.x versions). Even if you were to generate a template and export is via NiFi UI or NiFi's rest-api, modifying it will not change what is on the canvas. If you were to modify the connection component UUID in all places in the template. Upon upload of that template back in to NiFI, you would need to drop the template on the the canvas which would result in every component in that template getting a new UUID. So this does not work. In newer version of NiFi 1.18+ NiFi supports newer flow definitions which are in json format. but same issue persists here when using flow definitions in this manor. In a scenario like the one described in this post where user removed a connection by mistake and then re-created it, the best option is to restore/revert the previous flow. Whenever a change is made to the canvas, NIFi auto archives the current flow.xml.gz (legacy) and flow.json.gz (current) file in to an archive sub-directory and generates a new flow.xml.gz/flow.json.gz file. Best and safest approach approach is to shutdown all nodes in your NiFi cluster. Navigate to the NiFi conf directory and swap current flow.xml.gz/flow.json.gz files with the archived flow.xml.gz/flow.json.gz files still containing the connection with original needed ID. When above is not possible (maybe change went unnoticed for to long and all archive version have new connection UUID), you need to manually modify the flow.xml.gz/flow.json.gz files. Shutdown all your NiFi nodes to avoid any changes being made on canvas while performing following steps. Option 1: Make backup of current flow.xml.gz and flow.json.gz Search each file for original UUID to make sure it does not exist. On one node manually modify the flow.xml.gz and flow.json.gz files by locating the current bad UUID and replacing it with the original needed UUID. Copy the modified flow.xml.gz and flow.json.gz files to all nodes in the cluster replacing original files. this is possible since all nodes run same version of flow. Option 2: same as option 1 same as option 1 same as option 1 Start NiFi only on the node where you modified the flow.xml.gz and flow.json.gz files. On all other nodes still stopped, remove or rename the flow.xml.gz and flow.json.gz files. Start all the remaining nodes. since they do not have a flow.xml.gz or flow.json.gs to load, they will inherit the flow from the cluster as they join the cluster. NOTE: The flow.xml.gz was replaced by the newer flow.json.gz format starting with Apache NiFi 1.16. When NiFi is 1.16 or newer is started with and only has a flow.xml.gz file, it will load from flow.xml.gz and then generate the new flow.json.gz format. Apache NiFi 1.16+ will load only from the flow.json.gz on startup when that file exists, but will still write out both the flow.xml.gz and flow.json.gz formats anytime a change is made to the canvas. With Apache NiFi 2.x+ version the flow.xml.gz format will go away. Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-20-2024
12:51 PM
@jpconver2 @RAGHUY All 1.x versions of NiFi do not support rolling upgrades. With the major release of the NiFi 2.x versions, NiFi added rolling upgrade support as part of NIFI-12016 - Improve leniency for bundle compatibility to allow for rolling upgrades. The above Apache NiFi jira does a great job of explaining why historically in the NiFi 1.x branch this was not implemented. Above new feature improvement is included in NiFi 2.0.0-M1 (NiFi 2.0.0 milestone 1) or newer. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
05-20-2024
12:36 PM
@SAMSAL This is not a new problem, but rather something that has existed with NiFi on Windows fro a very long time. You'll need to avoid using space in directory names or warp that directory name in quotes to avoid the issue. NIFI-200 - Bootstrap loader doesn't handle directories with spaces in it on Windows Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more