Support Questions

Find answers, ask questions, and share your expertise

State Manager not persisting/retrieving data

avatar
Super Collaborator

NiFi 1.1.1

I am trying to persist a byte [] using the State Manager.

private byte[] lsnUsedDuringLastLoad;


@Override
	public void onTrigger(final ProcessContext context,
			final ProcessSession session) throws ProcessException {
...

...

...
final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}

...
...
...
}

I don't get any exception or even a log error entry, the processor continues to run.

The following load code always returns a null value(Retrieved the statemap : {})for the persisted field :

try {
					stateMap = stateManager.getState(Scope.CLUSTER);
					stateMapProperties = new HashMap<>(stateMap.toMap());
					
					logger.debug("Retrieved the statemap : "+stateMapProperties);


					lastMaxLSN = (stateMapProperties
							.get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
							.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
							: stateMapProperties.get(
									ProcessorConstants.LAST_MAX_LSN).getBytes();


					logger.debug("Attempted to load the previous lsn from NiFi state : "
							+ lastMaxLSN);
				} catch (IOException ioe) {
					logger.error("Couldn't load the state map", ioe);
					throw new ProcessException(ioe);
				}

I am wondering if the ZK is at fault or have I missed something while using the State Map !

1 ACCEPTED SOLUTION

avatar
Master Guru
3 REPLIES 3

avatar
Master Guru

Can you edit your question to include the code you're using to store the value into the state map (and update the StateManager with it)?

avatar
Contributor

I believe you need to set an initial state. It looks like you only call the `replace` method on the state manager. I believe it is necessary to initially call `setState`. An example of this can be found here

avatar
Master Guru