Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

StateMap keys across different instances of the same processor

Highlighted

StateMap keys across different instances of the same processor

Expert Contributor

Nifi 1.2.0.

In a custom processor, an LSN is used to fetch data from a SQL Server db table.

Following are the snippets of the code used for:

Storing a key-value pair

final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();

String lsnUsedDuringLastLoadStr = Base64.getEncoder().encodeToString(lsnUsedDuringLastLoad);
//Just a constant String used as key
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN, lsnUsedDuringLastLoadStr);


if (stateMap.getVersion() == -1) {
stateManager.setState(newStateMapProperties, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, newStateMapProperties, Scope.CLUSTER);
}
}

Retrieving the key-value pair

final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
final Map<String, String> stateMapProperties;
byte[] lastMaxLSN = null;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());

lastMaxLSN = (stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN) == null
|| stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: Base64.getDecoder()
.decode(stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).getBytes());
}

When a single instance of this processor is running, the LSN is stored and retrieved properly and the logic of fetching data from SQL Server tables works fine.

As per the NiFi doc. about state management:

Storing and Retrieving State

State is stored using the StateManager’s getState, setState, replace, and clear methods. All of these methods require that a Scope be provided. It should be noted that the state that is stored with the Local scope is entirely different than state stored with a Cluster scope. If a Processor stores a value with the key of My Key using the Scope.CLUSTER scope, and then attempts to retrieve the value using the Scope.LOCAL scope, the value retrieved will be null (unless a value was also stored with the same key using the Scope.CLUSTER scope). Each Processor’s state, is stored in isolation from other Processors' state.

When two instances of this processor are running, only one is able to fetch the data. This has led to the following question:

Is the StateMap a 'global map' which must have unique keys across the instances of the same processor and also the instances of different processors? In simple words, whenever a processor puts a key in the statemap, the key should be unique across the NiFi processors(and other services, if any, that use the State API) ? If yes, can anyone suggest what unique key should I use in my case?

Note: I quickly glanced at the standard MySQL CDC processor code class(CaptureChangeMySQL.java) and it has a similar logic to store and retrieve the state but then am I overlooking something ?

Don't have an account?
Coming from Hortonworks? Activate your account here