Member since
07-30-2019
3400
Posts
1621
Kudos Received
1003
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 171 | 12-05-2025 08:25 AM | |
| 307 | 12-03-2025 10:21 AM | |
| 585 | 11-05-2025 11:01 AM | |
| 436 | 11-05-2025 08:01 AM | |
| 876 | 11-04-2025 10:16 AM |
01-16-2018
06:10 PM
1 Kudo
@Chris Lundeberg Maybe helpful to share your MergeContent processors configuration here. 1. How many bins is the processor configured to use? 2. Sounds like each incoming FlowFile may have a considerable Attribute map size. All the attributes of the FlowFiles being merged are held in heap memory until the merge is complete, You may be having heap issues. Seen any Out of Memory errors in the nifi app .log? 3. What is the correlation attribute you are using to bin like FlowFiles? 4. How large is each FlowFile being merged? If they are very small (meaning it would take more then 20,000 of them to reach a 64 MB merged file), you may want to use multiple mergeContent processors in series to reduce the heap usage. Useful links: https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html https://community.hortonworks.com/questions/87178/merge-fileflow-files-based-on-time-rather-than-siz.html I have no personally seen FlowFiles routed to Failure losing their attributes. That seems very odd to me. The merged FlowFile, depending on configuration, may have different attributes however. I am assuming that your "avro_schema" attribute nay be fairly large. It may be better to use something smaller for your correlation attribute value in the MergeContent processor. You could use the ExtractAvroMetadata processor before the MergeContent processor. It will give you a "schema.fingerprint" attribute you could use instead to accomplish the same. Are you putting "${avro_schema}_${tablename}" in the mergeContent processor's Correlation Attribute Name property value? What this property does is resolve the provide EL above to its actual values then checks the incoming Flowfiles for an attribute with that resolved value. If found it places FlowFiles where the value of that resolved attribute match in the same bin. Just want to make sure you are using this property correctly. All FlowFiles that do not have the FlowFile attribute are allocated to a single bin. You also need to make sure your mergeContent processor is configured to have enough bins (number of needed bins +1) to accommodate all the various possible unique correlation attribute values. If you do not have enough bins, the mergeContent will force the merging of the oldest bin to free a bin to continue allocating additional FlowFiles. Thank you, Matt
... View more
01-16-2018
05:07 PM
@Eric Lloyd With the above configuration, it would only take 1 FlowFile to be assigned to a bin before that bin was marked eligible for merging. There is nothing there that force the processor to wait for other FlowFiles to be allocated to a bin before merge, Both minimums are set to 1 FlowFile and 0 Bytes. In order to actually get 100,000 Flowfiles (this is high and may trigger OOM), there would need to be 100,000 Flowfiles all with the same correlation attribute value in the incoming connection queue at the time the processor runs. This is almost certainly not going to be the case. The Max bin age simply sets an exist strategy here. It will merge a bin regardless if minimums have been met if the bin age has reached this value. You may want to set more reasonable values for your mins and also consider using multiple mergeContent processors in series to step up to the final merged number you are looking for. Thanks, Matt
... View more
01-16-2018
01:38 PM
1 Kudo
@Roger Young The Remote Process Group (RPG) is not designed for dynamic target URL assignment. It is designed to communicate with a target standalone of NiFi cluster. During that communication it learns about all currently connected nodes in a target Nifi cluster and retains the URLS for all those nodes so it can perform a load-balanced delivery of data. It the event RPG cannot get an updated listing form the target it will continue to try to delivery to the last known set of target nodes. Since the RPG was never intended to be used to delivery data to multiple independent target NiFi instances, the ideal of dynamic URL was never considered. There are other NifI processors such as putHTTP and InvokeHTTP that can take NiFi Expression Language (EL) as input for the target URL. Thank you, Matt
... View more
01-15-2018
09:34 PM
1 Kudo
@dhieru singh NiFi offers a "Summary" UI which has a connections tab you can select. Once selected you can sort on the "Queue / Size threshold" column by clicking on the "Queue" word". This will move your connections with the largest threshold percentage to the top of the list. 100% for Queue threshold indicates object threshold back pressure is being applied on that connection. 100% for Size threshold indicates that Queue Size back pressure is being applied on that connection. Clicking on the error to the far right of the row will take you directly to hat connection on your canvas no matter which process group it exists within. Thank you, Matt If you found this answer to be helpful in addressing your question, please take a moment to click the accept link below.
... View more
11-22-2017
04:10 PM
@Pratik Ghatak The NiFi instance/cluster with the Remote Process Group (RPG) is acting as the client and the target Nifi instance/Cluster is acting as the server in this Site-To-Site (S2S) connection. Your Target NiFi configuration you shared shows taht you have S2S setup to support both the RAW and HTTP transport protocols. It also appears from your screenshots that the initial connection between your two NiFis is working correctly. The error you are seeing indicates that you have not added any remote input ports on the target NiFi's canvas to receive a FlowFIles from the source NiFi. On the target NiFi, you will need to add one or more "input ports": Input ports added at the root/top level process group are considered "Remote input ports" and can be used to receive data over S2S. After you add these "remote input ports" to your target Nifi's canvas, you can right click on your source NiFi RPG and select "refresh" from the context menu that appears (or you can just wait for next auto-refresh at 30 seconds). Now you should be able to see those remote input ports when you drag a connection to the RPG. Thank you, Matt If you find this information has addressed your question/issue, please take a moment to click "Accept" beneath the Answer.
... View more
11-22-2017
03:58 PM
1 Kudo
@Mohamed Hossam You could use the ReplaceText processor instead of your script to accomplish what you are trying to do: The above ReplaceText processor will create 4 capture groups for the desired columns from your input FlowFiles. It will even work against incoming FlowFiles that have multiple entries (1 per line) Thank you, Matt If you find this answer addresses yoru question/issue, please take a moment to click "Accept" beneath the answer.
... View more
11-22-2017
03:15 PM
@Lanic All of NiFi executes within a single JVM, so it is not possible to kill a single thread within a single JVM. Unfortunately, a restart is required to clear hung threads. Hopefully you have ironed out any misconfiguration in development that lead to hung threads before deploying to production. I understand that naytime a restart is needed can be very annoying. Thanks, Matt A few Hortonworks community forum tips: 1. Use "@<username>" in responses so that user gets pinged; otherwise, user may never know you asked follow-up question. 2. Avoid starting a new "Answer" to respond to an existing answer. Instead click "Add comment". This makes following a thread/line of thought easier. 3. When an "Answer" addresses/solves your question, please select "Accept" beneath that answer. This encourages user participation in this forum.
... View more
11-22-2017
03:06 PM
2 Kudos
@sally sally If you do not know the exact number of files you expect to Merge, you must consider FlowFile latency. Consider the above MergeContent processor configuration: 1. "Correlation Attribute Name" <-- This property is used to determine which bin inbound queued FlowFiles are assigned to what bin. (You need to make sure you have enough bins to accommodate the number of unique filenames you expect to deal with.) 2. "Minimum Number of Entires" <-- This is the expected min number of FlowFiles to be allocated to a bin before it is considered eligible to be merged. (Since you do not know how many files will have the same filename (2,000+), I set this to 10,000). NiFi will continue to try and Flowfiles with the same filename until 10,000 is reached or bin has existed for "max bin age". 3. "Maximum Number of Entries" <-- If inbound queue more then 20,000 flowfiles with same filename, this property will trigger bin to merge at 20,000 and a new bin started for that filename. 20,000 is generally considered a good ceiling here to prevent excessive heap usage during merge. 4. "Max Bin Age" <-- This is your force merge property. No matter how many Flowfiles have been assigned to a given bin, that bin will be merged once the bin has existed for this amount of time. You set this to the max latency you are willing to accept for this dataflow. Thank you, Matt
... View more
11-22-2017
01:57 PM
@Lanic The reason you cannot empty that queue is because the "PutEmail" processor is still running and own that FlowFile. When you stop a processor it goes into a state of "stopping". In this state NiFi will no longer schedule the processor to run, but it does not interrupt any running threads. The small number in the upper right corner indicates the number of active threads tied to this processor component. Here I see "2" on your putEmail processor. Most likely 1 thread is the running configured processor task and the other thread is the stopping thread. Once these threads have completed, you will be able to empty the queue feeding this processor. If the thread(s) never release, then you have a hung processor. Sometimes this is the result of a poorly configured processor or associated controller service and other times it may be core related. You will need to inspect several Nifi thread dumps to identify the hung thread and see what it is waiting on. You can get a thread dump by running the following command: <path to Nifi>/bin/nifi.sh dump <name for dump file> Thanks, Matt
... View more
10-30-2017
08:32 PM
1 Kudo
@dhieru singh Min number of entries must be set and defaults to 1. That is fine as long as you don't set max num entries. You are correct it is (min number of entries AND min group size) OR max number of entries OR max group size. So either of the "max " settings will force a merge just like max bin age will.
... View more