That was indeed the issue. The Pulsar consumer.receiveAsync() method would return a CompletableFuture object, that would hang around until the broker had issued an acknowledgment. Given the speed of the Pulsar consumer, there were 10s of thousands of these objects (each with an individual FlowFile) in memory at any given time. I add an ExecutorCompletionService to the class, which limits the number of instances of CompletableFuture's at any given time, and was able to run a 24 hour stress test w/o any memory leaks.
... View more