Created 08-13-2018 06:25 PM
mergerecord-config.jpgI'm working on a flow that gathers some data from an API. Once the data is gathered, I have a total of 48 flow files, which are segmented by the metric they represent. There are 4 metrics and 12 flow files per metric (this structure arises from the way that the API is called). What I would like to do is to take those flow files and merge their records into one flow file per metric, but I'm having trouble with this.
My thought was that the most natural thing to do would be to use MergeRecord with its Defragment strategy to accomplish this. According to the documentation,
The "Defragment" Merge Strategy can be used when records need to be explicitly assigned to the same bin. For example, if data is split apart using the SplitRecord Processor, each 'split' can be processed independently and later merged back together using this Processor with the Merge Strategy set to Defragment. In order for FlowFiles to be added to the same bin when using this configuration, the FlowFiles must have the same value for the "fragment.identifier" attribute. Each FlowFile with the same identifier must also have the same value for the "fragment.count" attribute (which indicates how many FlowFiles belong in the bin) and a unique value for the "fragment.index" attribute so that the FlowFiles can be ordered correctly.
My flow files are not generated via any of the Split processors, but I figured this strategy should work for me. For each flowfile in pertaining to a metric, I set "fragment.identifier" to the name of the metric I'm collecting, and I set "fragment.count" to 12 (that number is computed but it's always 12 for the time being). As I understand the above documentation, this should be enough to make MergeRecord combine the flowfiles, but I also in another place that they need to have unique values of the "fragment.index" attribute, so I made sure that also holds.
As a result of this work, I get the following strange error:
Could not merge bin with 1 FlowFiles because the 'fragment.count' attribute had a value of '12' but only 1 of 12 FlowFiles were encountered before this bin was evicted (due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).
I'm not sure what this is supposed to mean. It's clear that the processor is recognizing how many flowfiles I'm trying to create, but for some reason it's telling me that the bin was evicted? I set the Maximum Number of Records parameter to something much larger than the number of records I have, and the Max Bin Age is 10 minutes, so that can't be it. I enabled debug logging in the processor, and found the following sequence:
2018-08-12 02:29:42,131 DEBUG [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] Got Group ID visits for StandardFlowFileRecord[uuid=9de71542-6fd0-4d9d-bfcb-c746129f38f8,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1533950635085-24170, container=default, section=618], offset=971108, length=69328],offset=0,name=8834764896238580,size=69328] 2018-08-12 02:29:42,131 DEBUG [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] Migrating id=15516457 to RecordBin[size=0, full=false, isComplete=false, id=380435] 2018-08-12 02:29:42,135 DEBUG [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] Created OutputStream using session StandardProcessSession[id=36005249715] for RecordBin[size=0, full=false, isComplete=false, id=380435] 2018-08-12 02:29:42,154 DEBUG [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] RecordBin[size=1, full=true, isComplete=false, id=380435] is now full. Completing bin. 2018-08-12 02:29:42,165 DEBUG [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] Marked RecordBin[size=1, full=true, isComplete=true, id=380435] as complete because complete() was called 2018-08-12 02:29:42,166 DEBUG [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] Closed Record Writer using session StandardProcessSession[id=36005249715] for RecordBin[size=1, full=true, isComplete=true, id=380435] 2018-08-12 02:29:42,166 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.MergeRecord MergeRecord[id=c9263f6a-480b-19aa-a7a5-e5ebecd4209b] Could not merge bin with 1 FlowFiles because the 'fragment.count' attribute had a value of '12' but only 1 of 12 FlowFiles were encountered before this bin was evicted (due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).In the above, "visits" is the name of my metric. What's weird is that once a single flowfile enters the bin, it's marked as full. Ok, I went and looked at the code of the processor; you can see if you follow the link that it attempts to compute the number of maximum records allowed, and if the conversion fails somehow, then it gets set to 1. I've attached my MergeRecord configuration as an image to this post, but you can see that there's no way maxRecords, if retrieved correctly, should be exceeded, nor can I see how parsing the string to an integer would fail. I'm at a complete loss. What's weird is that when I try to merge two flowflies that I generated by hand via GenerateFlowFile, the processor works as expected. But in my actual application, I can't for the life of me figure out what I need to do to make the merge work. I believe that I'm meeting the requirements of the processor, but this error message is completely unhelpful, and the only path I see to this failure is the bin being marked full prematurely for some reason. I don't know why that would happen, so I'm hoping someone in the community, maybe even one of the NiFi developers, might have an answer for me. I know I can achieve my end by doing some tricks with MergeContent, which hasn't given me this problem, but I feel like I want to understand just what is going on with MergeRecord. Thanks for any help.
Created 08-13-2018 07:31 PM
If you have your metric as an attribute you can choose bin algorithm and specify correlation attribute by metric name. Hope it will help
Created 08-13-2018 09:28 PM
I tried this, but the result was that it seemed to bin the files very oddly. I ended up with the wrong number of files; I wanted 4 and ended up with 13 for some reason. This strategy seems to work just fine with MergeContent, which did produce the correct number, but not with MergeRecord.
Created 08-27-2018 04:46 PM
can you sure some data? any logs? the flow template as XML?
Created 12-17-2018 10:02 PM
I believe I am running into the same issue, but have yet to find an answer. In my case, the code is not failing then defaulting to 1. It compares the record count of the first file with the value of fragment.count. Former is greater than the latter, so the bin gets marked as full. Crosslinking to hopefully shed more light on the issue.
Created 02-05-2019 05:17 AM
Hi, I am hitting the exact same issue as mentioned by OP. @Jerry Vinokurov, did you find any resoultion to this problem.
Created 02-05-2019 05:18 AM
Hi, I have the exact same issue. @Jerry Vinokurov , were you able to resolve the issue ?