Support Questions

Find answers, ask questions, and share your expertise

State Manager not persisting/retrieving data

avatar
Super Collaborator

NiFi 1.1.1

I am trying to persist a byte [] using the State Manager.

private byte[] lsnUsedDuringLastLoad;


@Override
	public void onTrigger(final ProcessContext context,
			final ProcessSession session) throws ProcessException {
...

...

...
final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}

...
...
...
}

I don't get any exception or even a log error entry, the processor continues to run.

The following load code always returns a null value(Retrieved the statemap : {})for the persisted field :

try {
					stateMap = stateManager.getState(Scope.CLUSTER);
					stateMapProperties = new HashMap<>(stateMap.toMap());
					
					logger.debug("Retrieved the statemap : "+stateMapProperties);


					lastMaxLSN = (stateMapProperties
							.get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
							.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
							: stateMapProperties.get(
									ProcessorConstants.LAST_MAX_LSN).getBytes();


					logger.debug("Attempted to load the previous lsn from NiFi state : "
							+ lastMaxLSN);
				} catch (IOException ioe) {
					logger.error("Couldn't load the state map", ioe);
					throw new ProcessException(ioe);
				}

I am wondering if the ZK is at fault or have I missed something while using the State Map !

1 ACCEPTED SOLUTION

avatar
Master Guru
3 REPLIES 3

avatar
Master Guru

Can you edit your question to include the code you're using to store the value into the state map (and update the StateManager with it)?

avatar
New Member

I believe you need to set an initial state. It looks like you only call the `replace` method on the state manager. I believe it is necessary to initially call `setState`. An example of this can be found here

avatar
Master Guru