Support Questions

Find answers, ask questions, and share your expertise

Implement batched file processing in NiFi

avatar
New Contributor

Hello, 
I am trying to implement file processing in batched manner using NiFi processors. 
My use case is, there are 70-80K files coming daily having size of 200-300 MB each.
Taking input those files from S3 store and sending those to spark execution by Livy processor. Plan is to not sending each file location to spark, instead we can batch few files and sending to spark by Livy, so livy connections to spark will get reduced. 

Below are consideration while creating batch.
1. Batch Size: Batch will be based on size e.g. 1000MB
2. Wait duration: If there are not enough file to complete batch size then, batch will start after specific wait duration

I am trying to implement this using wait, notity and updateAttribute(using stateful variables) based on batch size and wait time, but its not working fully. 

Any leads/suggestions how to implement this would be much appreciable. 
Thanks.  


1 REPLY 1

avatar
Master Mentor

@askh88 

I don't know anything about the "livy processor" you are using, but NiFi processor typically execute against a single FlowFile at a time. So trying to use wait notify to delay FlowFiles reaching the livy processor until you have x number of FlowFiles of X total size range would likely not make much difference in controlling number of spark connections.  

The question here is if it is possible to merge multiple FlowFiles in to one FlowFile that can be passed to your livy processor.  I don't know anything about structure of your data and if merge is possible via a mergeContent or MergeRecord processor. But if that Merging of FlowFiles is possible, that is the better route to take here.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt