Support Questions
Find answers, ask questions, and share your expertise

NiFi iteration of queue entries between processors

Contributor

I'm trying to keep track of the position/count of each separate JSON created by SplitJson. I've tried using my own row.counter and the written attribute fragment.count. However, I cannot get the behavior I need where each iteration is given an incremented value. Instead, each of the rows is given the same value for all resulting JSON split content. Is there something specific (e.g. SegmentContent or similar) I should use to achieve this task? Thanks.

10 REPLIES 10

Re: NiFi iteration of queue entries between processors

Super Guru

In NiFi 1.0.0+, SplitJson writes four fragment.* attributes:

NameDescription
fragment.identifierAll split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute
fragment.indexA one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile
fragment.countThe number of split FlowFiles generated from the parent FlowFile
segment.original.filenameThe filename of the parent FlowFile

Does fragment.index give you what you need? It is a 0-based auto-incrementing integer, which is unique for each flow file created from the incoming flow file.

Re: NiFi iteration of queue entries between processors

Contributor

Thanks for the answer. I did some more digging and after attempting to utilized the MergeContent processor for its ability to consume the 'fragment.index' in defragment mode I see the following error: "Cannot Defragment StandardFlowFileRecord[uuid=71418088-c691-409e-8fbd-ef314c989487,claim=StandardContentClaim [resourceClaim=StandardResourceClardContentClaim[id=1480368245158-99, container=default, section=99], offset=353875, length=20716],offset=0,name=14446521839524859,size=20716] because it does not have an integer value for the fragment.index attribute" Is there something I need to do to the SplitJson processor to provide the attribute as an integer?

Re: NiFi iteration of queue entries between processors

Contributor

The function 'nextInt()' is close to what I require, but it is a global value independent of a specific run. I'm not sure why I cannot get a relatively simple example of how to use one of the 'frament.*' approaches. Please help.

Re: NiFi iteration of queue entries between processors

Super Guru

What are the values in fragment.index? Also what is your configuration for MergeContent? There is an example template for the SplitRouteMerge pattern here.

Re: NiFi iteration of queue entries between processors

Contributor

The values show as "no value set". I'm using all the defaults on the MergeContent accept the Merge Strategy which has been changed to 'Defragment" to allow consumption of the 'frament.*' entities. The splitjson processor is using the $.* to fragment the structure. The SplitRouteMerge template uses the default "bin packing algorithm' I believe. Thanks.

Re: NiFi iteration of queue entries between processors

Contributor

I've been experimenting with these, however, I'm not sure if it's an issue of content versus attributes, but I cannot get the value to be manifested in the flowfile. BTW. The flowfile details show "Queue Position" as 'No value set'. I'm thinking this would also provide me with what I need if result was updated.

Re: NiFi iteration of queue entries between processors

Super Guru

Are you trying to get values of attribute(s) into the flow file content? For that you can use ReplaceText with NiFi Expression Language, or AttributesToJSON.

Re: NiFi iteration of queue entries between processors

Contributor

I've tried that as well. I've also used the Retry-Count-Loop at the link you provided (https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates). However, the iteration treats all the queue entries as one pass; instead of incrementing between each instance.

Re: NiFi iteration of queue entries between processors

Contributor

I am wondering if a customized approach may be necessary? It doesn't seem that what I'm trying to do would be that complicated, but I've yet found a working example of what I'm trying to achieve.