Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

NiFi state mgt., ambiguous behaviour across different machines and set-up

Highlighted

NiFi state mgt., ambiguous behaviour across different machines and set-up

Expert Contributor

NiFi 1.2.0, two nodes viz: l4513t.sss.se.com & l4514t.sss.se.com, both are RHEL 7 nodes.

I have a local(Windows 7) NiFi 1.2.0 where I develop and test my custom processor.

Following is a state management code snippet that stores an LSN, a SQL Server binary data type(retrieved and used as byte array in Java) :

final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN, new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : " + newStateMapProperties);
if (stateMap.getVersion() == -1) {
stateManager.setState(newStateMapProperties, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, newStateMapProperties, Scope.CLUSTER);
}
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi", ioException);
throw new ProcessException("The state(LSN) couldn't be persisted", ioException);
}

The below code is used to retrieve the LSN :

final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
final Map<String, String> stateMapProperties;
byte[] lastMaxLSN = null;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());
logger.debug("Retrieved the statemap : " + stateMapProperties);
lastMaxLSN = (stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN) == null
|| stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).getBytes();
} catch (IOException ioe) {
logger.error("Couldn't load the state map", ioe);
throw new ProcessException(ioe);
}

The processor works smoothly on the local, Windows 7 machine and the LSN is stored and retrieved several times, without any errors.

The problem surfaced when I deployed the processor on the dev. env. which has the above mentioned two RHEL 7 nodes. The processor executed successfully ONLY ONCE, later, it didn't throw any error but simply didn't get expected LSN, thus, not doing any work.

To get a complete idea of the situation, I created a test table which would store every query that would execute with the LSN. Again, this worked fine on the local, Windows 7 machine, following are some rows inserted in the test table.

Note : Just focus on the LSN field which is a binary(10) field, rest all can be ignored.

16282-lsn-stored-in-test-table.jpg

On dev., the processor executes EXACTLY ONCE, either when it was deployed the first time or after clearing its state. On the second execution, it retrieves the LSN and tries to store it back in the test table, only to get a JDBC exception:

com.microsoft.sqlserver.jdbc.SQLServerException: String or binary data would be truncated.
com.microsoft.sqlserver.jdbc.SQLServerException: String or binary data would be truncated.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:440)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:385)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:328)
at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:105)
at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:105)
at com.datalake.processors.SQLServerCDCProcessorCCC.logCTQueryWithParams(SQLServerCDCProcessorCCC.java:62)
at com.datalake.processors.SQLServerCDCProcessor.writeDataFromChangeTablesToFlowFiles(SQLServerCDCProcessor.java:575)
at com.datalake.processors.SQLServerCDCProcessor.onTrigger(SQLServerCDCProcessor.java:193)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I believe that the underlying ZK is the one which stores this metadata. I am unable to pinpoint the root cause and the location.

***********Edit-1***********

Out of curiosity/desperation, I tried using the UTF-8 charset :

while storing :

newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
                            new String(lsnUsedDuringLastLoad, StandardCharsets.UTF_8));

and retrieving the bytes :

lastMaxLSN = (stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN) == null
                            || stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
                                    : `stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).getBytes(StandardCharsets.UTF_8);`

Now, I get the same exception even on the Windows machine, the RHEL error remains the same.

4 REPLIES 4

Re: NiFi state mgt., ambiguous behaviour across different machines and set-up

One difference is that when you run your local instance NiFi is using a local state provider that stores data on the filesystem at NIFI_HOME/state, and when you run in a cluster you are using the ZooKeeper state provider, so there is obviously some different behavior happening between them.

Is the full code of your processor available somewhere?

The JDBC error you got appears to mean that you are trying to insert a value into a column where the value is larger than the size of the defined column:

https://stackoverflow.com/questions/6388756/sql-server-string-or-binary-data-would-be-truncated

Re: NiFi state mgt., ambiguous behaviour across different machines and set-up

Expert Contributor

The point is that error occurs only on the Linux machines, that too after the state is stored. With state cleared or on the Windows machine, the JDBC error doesn't occur and the data gets inserted.

Attaching the sample processor code

Re: NiFi state mgt., ambiguous behaviour across different machines and set-up

Thanks for the processor code.

Are the LSN values being written to the database by another application? If so, how does the other application create the bytes for the value?

I think the issue is related to converting between bytes and strings, and the different encodings on different platforms, but I'm not exactly sure what yet.

The state manager stores a string as bytes using Java's DataOutputStream:

out.writeUTF(entry.getValue());

And when retrieving values it does:

final String value = hasValue ? in.readUTF() : null;

When you call getBytes() on a String and don't provide a charset, then it uses the default charset of the platform.

Since Windows 7 works when you don't provide a charset, and then fails when you provide UTF_8, that probably means Windows 7 default charset is something other than UTF_8 which allowed it to work, where as RHEL7's default charset probably is UTF_8 which is why it failed. I know this still doesn't explain what the issue is, but just trying to think of different things going on.

Re: NiFi state mgt., ambiguous behaviour across different machines and set-up

Expert Contributor

Any suggestions to check if the LSN is persisted and retrieved correctly?

Don't have an account?
Coming from Hortonworks? Activate your account here