Member since
05-07-2022
1
Post
0
Kudos Received
0
Solutions
05-07-2022
11:59 PM
Hey all, I am trying to run a script or a custom processor to group data by attributes every hour. Queue size is up to 30-40k on a single run and it might go up to 200k depending on the case. Solution 1: Consume all flow files and group by attributes and create the new flow file and push the new one. Not ideal but gave it a try. While running this when I had 33k flow files on queue waiting. session.getQueueSize().getObjectCount() This number is returning 10k all the time even though I increased the queue threshold numbers on output flows. Solution 2: Better approach is consume one flow file and and filter flow files matching the provided attributes final List<FlowFile> flowFiles = session.get(file -> { if ( correlationId .equals(Arrays. stream ( keys ).map(file::getAttribute).collect(Collectors. joining ( ":" )))) return FlowFileFilter.FlowFileFilterResult. ACCEPT_AND_CONTINUE ; return FlowFileFilter.FlowFileFilterResult. REJECT_AND_CONTINUE ; }) ; Again with 33k waiting in the queue I was expecting around 200 new grouped flow files but 320 is created. It looks like a similar issue above and does not scan all waiting flow files on filter query. Problems-Question: Is there a parameter to change so this getObjectCount can take up to 300k? Is there a way to filter all waiting flow files again changing a parameter or by changing the processor? - I tried making default queue threshold 300k on nifi.properties but it didn't help
... View more
Labels:
- Labels:
-
Apache NiFi