Support Questions

Find answers, ask questions, and share your expertise

Count number of incoming flowfiles

avatar
Contributor

Hi 

Is there a way to get the number of the files in the input then assign the count value to an attribute and assign a number value to each file

 

I have files to merge using MergeContent so I should assign fragment.index for each file and fragment.count as total count of files to merge.

2 ACCEPTED SOLUTIONS

avatar
Contributor

Thank you for your question.

You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode.

============================

Here is the settings for UpdateAttribute
============================

OliverGong_0-1644836598754.png


Under the Advanced Mode of UpdateAttribute Processor:

  1. Set two rules as below:
    • R0 -> initializeBatchIndex
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})}
      • Actions (add fragment related attributes):
        • fragment.count
          • ${batchSize}
        • fragment.identifier (For each batch, it should generate a new UUID as the identifier)
          • ${UUID()}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
    • R1 -> Iterations
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()}
      • Actions (add fragment related attributes):
        • fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
          • ${getStateValue('fragment.count')}
        • fragment.identifier
          • ${getStateValue('fragment.identifier')}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}

NOTE:
Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set)

OliverGong_1-1644837695656.png

OliverGong_2-1644837915558.png


The result of the merged flow files would be merged via the same fragment.identifier.

OliverGong_4-1644838118741.png

OliverGong_5-1644838635937.png

 

Please let me know if this helps.

Thanks & Regards,
Oliver Gong

View solution in original post

avatar
Contributor

I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute

Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder.

 

Thanks to @OliverGong for the hint 🙂

View solution in original post

3 REPLIES 3

avatar
Contributor

Thank you for your question.

You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode.

============================

Here is the settings for UpdateAttribute
============================

OliverGong_0-1644836598754.png


Under the Advanced Mode of UpdateAttribute Processor:

  1. Set two rules as below:
    • R0 -> initializeBatchIndex
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})}
      • Actions (add fragment related attributes):
        • fragment.count
          • ${batchSize}
        • fragment.identifier (For each batch, it should generate a new UUID as the identifier)
          • ${UUID()}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
    • R1 -> Iterations
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()}
      • Actions (add fragment related attributes):
        • fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
          • ${getStateValue('fragment.count')}
        • fragment.identifier
          • ${getStateValue('fragment.identifier')}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}

NOTE:
Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set)

OliverGong_1-1644837695656.png

OliverGong_2-1644837915558.png


The result of the merged flow files would be merged via the same fragment.identifier.

OliverGong_4-1644838118741.png

OliverGong_5-1644838635937.png

 

Please let me know if this helps.

Thanks & Regards,
Oliver Gong

avatar
Contributor

Hi @OliverGong 

Thanks a lot for your helpful answer.

It increments the fragment.index atrribute until the BatchSize parameter value.

It works when I kow how much files I want to merge so I set the value in the BatchSize variable

But when I don't know how many files to merge (from business users) the fragment.count is not set correclty. 

Is there a way to get  dynamically the number of incoming files?

 

 

avatar
Contributor

I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute

Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder.

 

Thanks to @OliverGong for the hint 🙂