Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Count number of incoming flowfiles

avatar
Explorer

Hi 

Is there a way to get the number of the files in the input then assign the count value to an attribute and assign a number value to each file

 

I have files to merge using MergeContent so I should assign fragment.index for each file and fragment.count as total count of files to merge.

2 ACCEPTED SOLUTIONS

avatar
Explorer

Thank you for your question.

You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode.

============================

Here is the settings for UpdateAttribute
============================

OliverGong_0-1644836598754.png


Under the Advanced Mode of UpdateAttribute Processor:

  1. Set two rules as below:
    • R0 -> initializeBatchIndex
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})}
      • Actions (add fragment related attributes):
        • fragment.count
          • ${batchSize}
        • fragment.identifier (For each batch, it should generate a new UUID as the identifier)
          • ${UUID()}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
    • R1 -> Iterations
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()}
      • Actions (add fragment related attributes):
        • fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
          • ${getStateValue('fragment.count')}
        • fragment.identifier
          • ${getStateValue('fragment.identifier')}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}

NOTE:
Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set)

OliverGong_1-1644837695656.png

OliverGong_2-1644837915558.png


The result of the merged flow files would be merged via the same fragment.identifier.

OliverGong_4-1644838118741.png

OliverGong_5-1644838635937.png

 

Please let me know if this helps.

Thanks & Regards,
Oliver Gong

View solution in original post

avatar
Explorer

I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute

Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder.

 

Thanks to @OliverGong for the hint 🙂

View solution in original post

3 REPLIES 3

avatar
Explorer

Thank you for your question.

You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode.

============================

Here is the settings for UpdateAttribute
============================

OliverGong_0-1644836598754.png


Under the Advanced Mode of UpdateAttribute Processor:

  1. Set two rules as below:
    • R0 -> initializeBatchIndex
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})}
      • Actions (add fragment related attributes):
        • fragment.count
          • ${batchSize}
        • fragment.identifier (For each batch, it should generate a new UUID as the identifier)
          • ${UUID()}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}
    • R1 -> Iterations
      • Conditions:
        • ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()}
      • Actions (add fragment related attributes):
        • fragment.count(This parameter may be optional as it always be the same size around one specific batch test )
          • ${getStateValue('fragment.count')}
        • fragment.identifier
          • ${getStateValue('fragment.identifier')}
        • fragment.index
          • ${getStateValue('fragment.index'):plus(1):mod(${batchSize})}

NOTE:
Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set)

OliverGong_1-1644837695656.png

OliverGong_2-1644837915558.png


The result of the merged flow files would be merged via the same fragment.identifier.

OliverGong_4-1644838118741.png

OliverGong_5-1644838635937.png

 

Please let me know if this helps.

Thanks & Regards,
Oliver Gong

avatar
Explorer

Hi @OliverGong 

Thanks a lot for your helpful answer.

It increments the fragment.index atrribute until the BatchSize parameter value.

It works when I kow how much files I want to merge so I set the value in the BatchSize variable

But when I don't know how many files to merge (from business users) the fragment.count is not set correclty. 

Is there a way to get  dynamically the number of incoming files?

 

 

avatar
Explorer

I used GetHDFSFileInfo to get the numbe of incoming files with hdfs.count.files attribute

Then at the end of the dataflow I move the processed files into a separate folder so only files to merge stay in the root folder.

 

Thanks to @OliverGong for the hint 🙂

Labels