Created 03-20-2017 11:41 AM
NiFi 1.1.1
I am trying to persist a byte [] using the State Manager.
private byte[] lsnUsedDuringLastLoad; @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { ... ... ... final StateManager stateManager = context.getStateManager(); try { StateMap stateMap = stateManager.getState(Scope.CLUSTER); final Map<String, String> newStateMapProperties = new HashMap<>(); newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN, new String(lsnUsedDuringLastLoad)); logger.debug("Persisting stateMap : " + newStateMapProperties); stateManager.replace(stateMap, newStateMapProperties, Scope.CLUSTER); } catch (IOException ioException) { logger.error("Error while persisting the state to NiFi", ioException); throw new ProcessException( "The state(LSN) couldn't be persisted", ioException); } ... ... ... }
I don't get any exception or even a log error entry, the processor continues to run.
The following load code always returns a null value(Retrieved the statemap : {})for the persisted field :
try { stateMap = stateManager.getState(Scope.CLUSTER); stateMapProperties = new HashMap<>(stateMap.toMap()); logger.debug("Retrieved the statemap : "+stateMapProperties); lastMaxLSN = (stateMapProperties .get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties .get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null : stateMapProperties.get( ProcessorConstants.LAST_MAX_LSN).getBytes(); logger.debug("Attempted to load the previous lsn from NiFi state : " + lastMaxLSN); } catch (IOException ioe) { logger.error("Couldn't load the state map", ioe); throw new ProcessException(ioe); }
I am wondering if the ZK is at fault or have I missed something while using the State Map !
Created 03-20-2017 02:56 PM
I answered this question on stackoverflow:
https://stackoverflow.com/questions/42902718/state-manager-not-persisting-retrieving-data
Created 03-20-2017 02:20 PM
Can you edit your question to include the code you're using to store the value into the state map (and update the StateManager with it)?
Created 03-20-2017 02:55 PM
I believe you need to set an initial state. It looks like you only call the `replace` method on the state manager. I believe it is necessary to initially call `setState`. An example of this can be found here
Created 03-20-2017 02:56 PM
I answered this question on stackoverflow:
https://stackoverflow.com/questions/42902718/state-manager-not-persisting-retrieving-data