Created 04-29-2020 06:48 AM
Hi,
I have NiFi flow where I am doing data validation using ValidateCsv processor(Line by line validation). I have N number of processor connected one after another i.e. serially. Note that I am connecting both valid and invalid relationship to next ValidateCsv processor and so on. So here each ValidateCsv processor could create 2 flow file i.e 1 valid and another is invalid or only valid or invalid with common attributes.
After doing validation I need to trigger next processor only once(Ex. Execute-SQL to ingest data from another table), to achieve this I am using merge content (Using Defragment strategy) processor to combine all flow-file to 1. But all flow-file are not merging together as few flow-file are having same attribute like fragment.index.
Can you please suggest how can I wait and combine all flow-files to 1 flow-file so that I can trigger next processor only once.
Created 04-30-2020 12:06 PM
If you have multiple FlowFiles with the same fragement.index and fragment.identifier, this indicates that at some point in your dataflow this FlowFIle was likely cloned (Same FlowFile routed to multiple relationships) or split (two FlowFiles created from one). For example you dragged the success relationship twice. Each of those FlowFiles will have however different UUID assigned to them. You could use NiFi Provenance to look at the lineage of these FlowFIles to see at what point in you flow the clone was created. In your case a CSV record that gets split into two FlowFIles (one with invalid records and another with some valid records) is going to be your issue since you route them to same destination processor.
You didn't share your MergeContent processor configuration, but keep in mind the following:
1. A processor executes based on its configured run Schedule. The default run schedule ins always 0 secs which means execute as fast as it can. This means a processor will execute and once it completes that execution, it will execute again. (If at execution there is no work to do, it will yield for a period of time before next execution is allowed to avoid excessive CPU usage when no work exists).
2. At time of Execution the MergeContent will look at what new FlowFiles since last execution and allocate those FlowFiles to bins. If at end of that execution a bin meets the minimums configured, the bin will be merged (Mins don't matter if using Defragment merge strategy).
3. When using Defragment, the expectation is that all FlowFiles with same Fragment.identifier also have same Fragment.count and that only one FlowFile exists in a bin for each fragment.index value.
It would be impossible to reassemble to original FlowFile since you have multiple FlowFiles now with the same fragment.identifier and fragment.index values. The real question is whether you are actually trying to reassemble the exact original FlowFile or simply create a single FlowFile for which to trigger downstream processing?
If the latter is the case, don't use Defragment merge strategy. Use Bin-Packing Algorithm and set fragment.identifier as the "Correlation Attribute Name". Since you really have no idea how many fragments there will be at this point, set your mins number of entires to a value large enough it would never be reached for a single fragment.identifier and then set the "max bin age" to a value high enough that you fee comfortable all fragments would have made it to the MergeContent. Max.bin.age will force a bin to be merged at x amount of time after first FlowFile was allocated to that bin even if mins were not met.
Also make sure you have configured n+1 number of bins, where "n" is the number of expected unique fragment.identifiers you expect to have at any one being binned by the MergeContent processor.
Hope this helps,
Matt
Created 05-05-2020 03:56 AM
Hi Matt,
Thanks for your response!
1. We don't have any success relationship twice. I know if we drag it twice then it will create duplicate copy of flow file. Yes, you are right in my case I am using multiple ValidateCsv processor to apply different validation on CSV data. Note that I have connected the ValidateCsv processor one by one i.e. each processor is responsible to do different kind of validation(Ex: Null Check, Empty Check,Unique,Equals etc.).
2. In this case if each flow file contains few invalid record then it will split into two Flow Files (one with invalid records and another with some valid records). Again these two Flow File are input to next ValidateCsv processor, here also again if both Flow File contains few invalid record then again they are going to split into two and so on. Here my problem is each Flow File after split having same attributes like fragment.count, fragment.identifier,fragment.index, I want to avoid these duplicate Flow Files after all validation is done.
3. I am not trying to reassemble the exact original Flow File, I simply want to create a single Flow File which is going to trigger downstream processing/processor only once.
4. MergeContent processor configuration
Merge Strategy : Defragment
Attribute Strategy : Keep All Unique Attributes
(Note all other properties are default one)
Important: We have clustered NiFi instance.
5. I have tried by using Bin-Packing Algorithm and set fragment.identifier as the "Correlation Attribute Name". And by setting the "max bin age" It is working. But we don't have any idea about the amount of data we are going to process and how much time it will take to process data as we are creating these kind of flows dynamically using NiFi REST API in different environment. In this case we can't set approximate value of "max bin age" instead of that we want to trigger downstream processing immediately after first processing is complete.
I am looking for a logic which will provide me a single Flow File out of these multiple Flow File.
Thank you,
Prashant
Created 05-05-2020 12:07 PM
My next thought would be to just drop the cloned FlowFile right before the MergeContent since you intent is not to reassemble the exact same original file. Your intent, if i understand correctly, is to just create one file that contains 1 FlowFile with each unique fragment index.
If that is the case, perhaps all you need is a detectDuplicate in front of your mergeContent that uses a combination of Fragment.identifier and fragment.index for determining if there is a duplicate FlowFile to drop. You would simply set the "Cache Entry Identifier" to "${fragment.identifier}-${fragment.index}". Then only the first occurrence of a FlowFile with unique fragment.index+fragment.identifier will make it in to the connection queue for your MergeContent processor. Then the Defragment merge strategy will work successfully giving you what I believe you are looking for here.
Hope this helps with your use case,
Matt