Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to find a memory leak inside a custom NiFi processor?

Highlighted

How to find a memory leak inside a custom NiFi processor?

New Contributor

Experts,

I have written a custom processor and associated unit tests. Everything works well in a limited environment. However, when I set up a simple stress test, the processor causes NiFi itself to crash and restart over and over again. When I ran analysis on the heap dump generated from NiFi, it showed that the leaks suspect was the org.apache.nifi.controller.repository.StandardProcessSession class, which was consuming 81% of the heap.

My questions are:

1) What could be causing this behavior

2) Since this class is a core class, how do I go about finding where these instances are being created so I can identify the leak?

2 REPLIES 2

Re: How to find a memory leak inside a custom NiFi processor?

You'll want to review the heap dump further but it sounds highly likely that FlowFile objects are being built up in large numbers (10s/100s of thousands+) within the ProcessSession which can quickly take up a lot of heap. The heap storage for that is not the content of the flowfiles but is the attributes and that can still add up fast. You want to ensure you're committing the session frequently to move those along and not have the session have too many flowfiles being tracked at once. Now, having said that you should also consider doing ConsumePulsarRecord instead of just ConsumePulsar, for example. Doing it via the record model will dramatically outperform doing it otherwise unless you do your own raw record framing such as newlines or something where a single flowfile represents many records at once. If it is a single record is a single flowfile then just be sure you're committing the session frequently.

Re: How to find a memory leak inside a custom NiFi processor?

New Contributor

That was indeed the issue. The Pulsar consumer.receiveAsync() method would return a CompletableFuture object, that would hang around until the broker had issued an acknowledgment. Given the speed of the Pulsar consumer, there were 10s of thousands of these objects (each with an individual FlowFile) in memory at any given time.

I add an ExecutorCompletionService to the class, which limits the number of instances of CompletableFuture's at any given time, and was able to run a 24 hour stress test w/o any memory leaks.