I have an issue with my query database processor which is showing two state values. One of it has the scope pertaining to a node in my cluster where as the other state value has the scope set to a cluster level . Any idea why this may be happening ?
Attached is the screen shot for your perusal.
The state supposed to be stored as Cluster scope so that when new node in the cluster elected as primary then new primary node needs to get the state and pull only the incremental load.
I think when the nodes got disconnected from the cluster and processor scheduled to be run at the time probably at the time processor stored the state in one node.
Did you see any duplicate records got pulled from the table?
Thanks @Shu for your response.
Yes , we saw some duplicates in the target table because of this. How do we fix this issue or what is the ideal way to design this. We execute the query database table processor only on one node as when we run it on 2 nodes , it gives duplicates . So our Query Database Table processor I believe was running on the primary node after which there was a switch and that node wasnt the primary going forward , so would this have resulted in this behaviour ??
What is the best way to prevent such scenarios as we are going to productionize this stream and if such issues arise , it might cause lots of data related issues. Please let me know of your thoughts
As Matt mentioned in comments NiFi favours data duplication over data loss in this scenario's we are going to be end up with some duplicates in the tables.
By Changing nifi.properties Configs:
NiFi depends on Apache ZooKeeper for determining which node in the cluster should play the role of Primary Node and which node should play the role of Cluster Coordinator.
Check the zookeeper/cluster.node.connect timeouts configs in your nifi.properties file, if they are setup to default values (or) less secs then try to increase the values and see is this helped to mitigate this node changing issues.
some reference docs regarding this timeout settings
By using Hive to Eliminate Duplicates:
It's always best practice to keep ingestion_timestamp field to each record that is storing into Hive..etc.
So while retrieving the data from Hive use window function using Row_Number() to get most recent record by using where row_number=1 .
Use predicate push downs while selecting data so that we are minimizing the data that would go to this window functions.
By following this way if we are already having duplicated data also we are going to get only the most recent record.
Even you can run some dedupe kind of jobs(using row_number) on hive table like once a week which is going to clean up the table that can eliminate duplicates in the table.
References regarding implementation of row_number function
By Storing the state in Hive/HBase/HDFS:
Try with this approach once..!!
Instead of storing the state in NiFi use Hive/HBase/HDFS to store your state (it's an extra step now and there is dependencies on these services now, if the services are down then we are not going to fetch the last state) and fetch from the respected storage and pass that to ExecuteSql processor and pull the data incrementally from RDBMS tables.
Reference regarding storing the state and fetching again
in the first example i have stored the state in DistributedCacheMap change the state storage to Hive/HBase/HDFS and in the next run prepare the last state and new state attributes and pass them to ExecuteSQL processor.
Try with these approaches and pick up the approach that most opt for your scenario.
When a node becomes disconnected from the cluster, it should have stopped any processor with an "Execution" configuration of "Primary node".
Was this processor configured to run on all nodes?
Did a user possibly start this processor while the node was disconnected?
----- When a node is disconnected from the NiFi cluster, a user can access that nodes URL directly and make changes. Changes to state (start, stop) will not prevent the node from rejoining the cluster, but configuration changes will.
----- Once node joins/rejoins cluster, it will assume the cluster defined processor state for each processor.
It may be helpful to carefully review the nifi-user.log and nifi-app.log on that specific node during the timeframe the local state was recorded to see what was going on.
This processor was scheduled to run only on primary node as we have observed that when we run it on all nodes , it yields duplicates. But yes , there was some instability and I guess a node switch happened and that may have caused this issue. However , having said this , this would be a matter of concern for us as nodes in the cluster would always be going up or down and ideally Nifi should be able to store the state irrespective of the node failure.
Whats the best solution you reckon to fix this issue ? can we do some tweaks at the State Management file or somethign on those lines ?
NiFi favors data duplication over data loss.
The primary node may become disconnected for a number of reason.
Failed to process a change request that was replicated to it, it failed to heartbeat, etc...
When a primary node change occurs (ZK elects the primary node and cluster coordinator), the primary node processors on the old primary node are no longer scheduled to execute. The node will not kill off any active threads on those processors. At that time the newly elected primary node will start to execution of the primary nodes.
It is possible the thread running on the old primary node took longer to complete then the new primary node took to start and pull last recorded state in ZK.