Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Erroneous persistence of data via State Manager

Highlighted

Erroneous persistence of data via State Manager

Expert Contributor

NiFi 1.2.0, two nodes.

There is a custom processor which persists some bytes via the State api, following is the relevant code snippet :

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN, new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : " + newStateMapProperties);
if (stateMap.getVersion() == -1) {
stateManager.setState(newStateMapProperties, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, newStateMapProperties, Scope.CLUSTER);
}
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi", ioException);
throw new ProcessException("The state(LSN) couldn't be persisted", ioException);
}
.
.
.
}
private boolean writeDataFromChangeTablesToFlowFiles(ProcessContext context, ProcessSession session) {
.
.
.
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
final Map<String, String> stateMapProperties;

.
.
.
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());
logger.debug("Retrieved the statemap : " + stateMapProperties);
lastMaxLSN = (stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN) == null
|| stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).getBytes();
}

On my local machine, the processor runs fine for even days, the data is picked up whenever it's inserted/updated/deleted.

When the nar was first deployed, the processor worked fine and fetched the data it was supposed to. After some time(probably, the second execution itself), it simply stops pulling data. I checked the state of the processor and I am unsure if looks correct - the lsn bytes should be stored as a single, uniform value accessible to all nodes :

15878-processor-states-multi-node.jpg

Now, if I start-stop the processor and clear the state, it starts fetching the data ?

What is the logical mistake that I have committed ?

**********Edit-1*********

I configured the processor to run only on the primary node, yet I have the same issue.

**********Edit-2**********

The state-management.xml on l4513t :

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<stateManagement>
    <!--
        State Provider that stores state locally in a configurable directory. This Provider requires the following properties:
        Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
                    is important that the directory be copied over to the new version when upgrading NiFi.
         Always Sync - If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very
                 expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the
                 operating system crashes. The default value is false.
         Partitions - The number of partitions.
         Checkpoint Interval - The amount of time between checkpoints.
     -->
    <local-provider>
        <id>local-provider</id>
        <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
        <property name="Directory">./state/local</property>
         <property name="Always Sync">false</property>
         <property name="Partitions">16</property>
         <property name="Checkpoint Interval">2 mins</property>
    </local-provider>
    <!--
        State Provider that is used to store state in ZooKeeper. This Provider requires the following properties:
        Root Node - the root node in ZooKeeper where state should be stored. The default is '/nifi', but it is advisable to change this to a different value if not using
                   the embedded ZooKeeper server and if multiple NiFi instances may all be using the same ZooKeeper Server.
        Connect String - A comma-separated list of host:port pairs to connect to ZooKeeper. For example, myhost.mydomain:2181,host2.mydomain:5555,host3:6666
        Session Timeout - Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session. Default value is "30 seconds"
        Access Control - Specifies which Access Controls will be applied to the ZooKeeper ZNodes that are created by this State Provider. This value must be set to one of:
                            - Open  : ZNodes will be open to any ZooKeeper client.
                            - CreatorOnly  : ZNodes will be accessible only by the creator. The creator will have full access to create children, read, write, delete, and administer the ZNodes.
                                             This option is available only if access to ZooKeeper is secured via Kerberos or if a Username and Password are set.
        <property name="Connect String">l4513t.sss.se.com:2181,l4514t.sss.se.com:2181</property>
    -->
    <cluster-provider>
        <id>zk-provider</id>
        <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
        <property name="Connect String">l4373t.sss.se.com:2181,l4283t.sss.se.com:2181,l4284t.sss.se.com:2181</property>
        <property name="Root Node">/nifi</property>
        <property name="Session Timeout">10 seconds</property>
        <property name="Access Control">Open</property>
    </cluster-provider>
</stateManagement>

The state-management.xml on l4514t :

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
  This file provides a mechanism for defining and configuring the State Providers
  that should be used for storing state locally and across a NiFi cluster. In order
  to use a specific provider, it must be configured here and its identifier
  must be specified in the nifi.properties file.
-->
<stateManagement>
    <!--
        State Provider that stores state locally in a configurable directory. This Provider requires the following properties:
        Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
                    is important that the directory be copied over to the new version when upgrading NiFi.
          Always Sync - If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very
                  expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the
                  operating system crashes. The default value is false.
          Partitions - The number of partitions.
          Checkpoint Interval - The amount of time between checkpoints.
     -->
    <local-provider>
        <id>local-provider</id>
        <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
        <property name="Directory">./state/local</property>
          <property name="Always Sync">false</property>
          <property name="Partitions">16</property>
          <property name="Checkpoint Interval">2 mins</property>
    </local-provider>
    <!--
        State Provider that is used to store state in ZooKeeper. This Provider requires the following properties:
        Root Node - the root node in ZooKeeper where state should be stored. The default is '/nifi', but it is advisable to change this to a different value if not using
                   the embedded ZooKeeper server and if multiple NiFi instances may all be using the same ZooKeeper Server.
        Connect String - A comma-separated list of host:port pairs to connect to ZooKeeper. For example, myhost.mydomain:2181,host2.mydomain:5555,host3:6666
        Session Timeout - Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session. Default value is "30 seconds"
        Access Control - Specifies which Access Controls will be applied to the ZooKeeper ZNodes that are created by this State Provider. This value must be set to one of:
                            - Open  : ZNodes will be open to any ZooKeeper client.
                            - CreatorOnly  : ZNodes will be accessible only by the creator. The creator will have full access to create children, read, write, delete, and administer the ZNodes.
                                             This option is available only if access to ZooKeeper is secured via Kerberos or if a Username and Password are set.
        <property name="Connect String">l4513t.sss.se.com:2181,l4514t.sss.se.com:2181</property>
    -->
    <cluster-provider>
        <id>zk-provider</id>
        <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
        <property name="Connect String">l4373t.sss.se.com:2181,l4283t.sss.se.com:2181,l4284t.sss.se.com:2181</property>
        <property name="Root Node">/nifi</property>
        <property name="Session Timeout">10 seconds</property>
        <property name="Access Control">Open</property>
    </cluster-provider>
</stateManagement>
2 REPLIES 2

Re: Erroneous persistence of data via State Manager

Can you provide the contents of your state-management.xml (blocking out anything sensitive)?

Is there any part of the code where you use Scope.LOCAL?

Like
stateManager.setState(newStateMapProperties,Scope.LOCAL);

or

stateManager.replace(newStateMapProperties,Scope.LOCAL);

Re: Erroneous persistence of data via State Manager

Expert Contributor

Added the state-management.xml from both the nodes, can you check ?

There is no 'LOCAL' word in the processor code :)

Don't have an account?
Coming from Hortonworks? Activate your account here