Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

State Manager not persisting/retrieving data

avatar
Super Collaborator

NiFi 1.1.1

I am trying to persist a byte [] using the State Manager.

private byte[] lsnUsedDuringLastLoad;


@Override
	public void onTrigger(final ProcessContext context,
			final ProcessSession session) throws ProcessException {
...

...

...
final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}

...
...
...
}

I don't get any exception or even a log error entry, the processor continues to run.

The following load code always returns a null value(Retrieved the statemap : {})for the persisted field :

try {
					stateMap = stateManager.getState(Scope.CLUSTER);
					stateMapProperties = new HashMap<>(stateMap.toMap());
					
					logger.debug("Retrieved the statemap : "+stateMapProperties);


					lastMaxLSN = (stateMapProperties
							.get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
							.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
							: stateMapProperties.get(
									ProcessorConstants.LAST_MAX_LSN).getBytes();


					logger.debug("Attempted to load the previous lsn from NiFi state : "
							+ lastMaxLSN);
				} catch (IOException ioe) {
					logger.error("Couldn't load the state map", ioe);
					throw new ProcessException(ioe);
				}

I am wondering if the ZK is at fault or have I missed something while using the State Map !

1 ACCEPTED SOLUTION

avatar
Master Guru
3 REPLIES 3

avatar
Master Guru

Can you edit your question to include the code you're using to store the value into the state map (and update the StateManager with it)?

avatar
New Member

I believe you need to set an initial state. It looks like you only call the `replace` method on the state manager. I believe it is necessary to initially call `setState`. An example of this can be found here

avatar
Master Guru