Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How are variables shared between concurrent instances of a processor in NiFi?

Highlighted

How are variables shared between concurrent instances of a processor in NiFi?

New Contributor

I am developing a custom processor for my team to handle processing of our flowfiles to various output paths and splitting based on specific conditions.

Recently I've run into an issue that, as far as I can tell the only reason would be thread-safety causing a variable to be overridden after an if check passed by another "instance" using that variable.

So my question is, say I have a processor along the lines of:

...

...
...class MyCustomProcessor extends AbstractSessionFactory {

AtomicReference<Set<String>> mySet = new AtomicReference();

int myCounter;

...

@OnScheduled

public configure( session... {

mySet.set(new HashSet<String>());

myCounter = 0;

...}

...

onTrigger( {

do stuff that may affect mySet... and/or myCounter...


if (mySet.get.size() > 0) {

FlowFileAttributes myattributes = createCustomAttributesFromSet();

flowFile = session.putAllAttributes(myattributes);

session.transfer(flowFile, Relationship.warnings);

mySet.get.clear();

}

else

session.transfer(flowFile, Relationships.success)

}


}


Now lets say I have 5 or 10 concurrent instances of this running from the scheduling options of the processor.

Do ALL those instances have their own instance of the mySet and myCounter variables, or is these single global variables between all instances, which can be modified at any point by any of the threads and cause unpredictable values?