- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Count number of incoming flowfiles
- Labels:
-
Apache NiFi
Created ‎02-14-2022 01:04 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi
Is there a way to get the number of the files in the input then assign the count value to an attribute and assign a number value to each file
I have files to merge using MergeContent so I should assign fragment.index for each file and fragment.count as total count of files to merge.
Created ‎02-14-2022 03:40 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thank you for your question.
You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode.
============================
Here is the settings for UpdateAttribute
============================
Under the Advanced Mode of UpdateAttribute Processor:
- Set two rules as below:
- R0 -> initializeBatchIndex
- Conditions:
- ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})}
- Actions (add fragment related attributes):
- fragment.count
- ${batchSize}
- fragment.identifier (For each batch, it should generate a new UUID as the identifier)
- ${UUID()}
- fragment.index
- ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
- fragment.count
- Conditions:
- R1 -> Iterations
- Conditions:
- ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()}
- Actions (add fragment related attributes):
- fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
- ${getStateValue('fragment.count')}
- fragment.identifier
- ${getStateValue('fragment.identifier')}
- fragment.index
- ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
- ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
- fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
- Conditions:
- R0 -> initializeBatchIndex
NOTE:
Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set)
The result of the merged flow files would be merged via the same fragment.identifier.
Please let me know if this helps.
Thanks & Regards,
Oliver Gong
Created on ‎02-14-2022 10:08 AM - edited ‎02-14-2022 10:08 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute
Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder.
Thanks to @OliverGong for the hint 🙂
Created ‎02-14-2022 03:40 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thank you for your question.
You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode.
============================
Here is the settings for UpdateAttribute
============================
Under the Advanced Mode of UpdateAttribute Processor:
- Set two rules as below:
- R0 -> initializeBatchIndex
- Conditions:
- ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})}
- Actions (add fragment related attributes):
- fragment.count
- ${batchSize}
- fragment.identifier (For each batch, it should generate a new UUID as the identifier)
- ${UUID()}
- fragment.index
- ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
- fragment.count
- Conditions:
- R1 -> Iterations
- Conditions:
- ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()}
- Actions (add fragment related attributes):
- fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
- ${getStateValue('fragment.count')}
- fragment.identifier
- ${getStateValue('fragment.identifier')}
- fragment.index
- ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
- ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
- fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
- Conditions:
- R0 -> initializeBatchIndex
NOTE:
Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set)
The result of the merged flow files would be merged via the same fragment.identifier.
Please let me know if this helps.
Thanks & Regards,
Oliver Gong
Created ‎02-14-2022 05:49 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @OliverGong
Thanks a lot for your helpful answer.
It increments the fragment.index atrribute until the BatchSize parameter value.
It works when I kow how much files I want to merge so I set the value in the BatchSize variable
But when I don't know how many files to merge (from business users) the fragment.count is not set correclty.
Is there a way to get dynamically the number of incoming files?
Created on ‎02-14-2022 10:08 AM - edited ‎02-14-2022 10:08 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute
Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder.
Thanks to @OliverGong for the hint 🙂
