Support Questions
Find answers, ask questions, and share your expertise

Nifi 1.16.1 Get or Filter all waiting flowfiles in the queue up to 300k

New Contributor

Hey all,
I am trying to run a script or a custom processor to group data by attributes every hour. Queue size is up to 30-40k on a single run and it might go up to 200k depending on the case. 

Solution 1: Consume all flow files and group by attributes and create the new flow file and push the new one. Not ideal but gave it a try.
While running this when I had 33k flow files on queue waiting.

session.getQueueSize().getObjectCount()

This number is returning 10k all the time even though I increased the queue threshold numbers on output flows.  
Solution 2: Better approach is consume one flow file and and filter flow files matching the provided attributes

final List<FlowFile> flowFiles = session.get(file -> {
if (correlationId.equals(Arrays.stream(keys).map(file::getAttribute).collect(Collectors.joining(":"))))
return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
});

  Again with 33k waiting in the queue I was expecting around 200 new grouped flow files but 320 is created. It looks like a similar issue above and does not scan all waiting flow files on filter query.
Problems-Question: 
Is there a parameter to change so this getObjectCount can take up to 300k?

Is there a way to filter all waiting flow files again changing a parameter or by changing the processor?

 - I tried making default queue threshold 300k on nifi.properties but it didn't help

0 REPLIES 0
; ;