Support Questions

Find answers, ask questions, and share your expertise

verify data replication in different data centers using nifi

avatar
Explorer

Hi,

 

I have 3 data centers dc1,dc2,dc3. There are about 100 million records migrated to dc1 and replicated to both dc2 and dc3. 

My goal is to find those records which are not replicated to dc2 and dc3 from dc1.

 

Thanks in advance.

 

6 REPLIES 6

avatar
Master Mentor

@sahil0915 

I don't know that this is a good use case for NiFi.  NiFi at the most basic level is designed to automate the movement of data between systems.  In between the ingest of data and egress of data, NiFi provides a variety of components for route, enhancing, modify, etc that data.

So trying to use NiFi to compare the data existing in multiple data centers is not a good fit.

 

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt




avatar
Explorer

Thanks for the reply @MattWho 

Cant we acheive this using detectDuplicate processors.

I have done a  POC  first on only 2 tables. Lets say we have only 10 records in table1 and copied 8 records in table 2.

I fetch all the records using QueryDatabaseTable processor  from both tables and process each flow file in cryptographicHashConent and  DetectDuplicate processor.  anyway the results are not expected. I have pasted the template file , but please have a look and let me know if we could detect the records which exists only once in flow files or text files(if I fetch records from file). 

 

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.3">
<description></description>
<groupId>018a1075-885b-197d-61c4-6910ed948be3</groupId>
<name>consistencyCheck approach 1</name>
<snippet>
<processors>
<id>c18e2f53-136e-3e5b-0000-000000000000</id>
<parentGroupId>2629c87c-99e6-3b17-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-avro-nar</artifact>
<group>org.apache.nifi</group>
<version>1.22.0</version>
</bundle>
<config>
<backoffMechanism>PENALIZE_FLOWFILE</backoffMechanism>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<maxBackoffPeriod>10 mins</maxBackoffPeriod>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>false</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<retryCount>10</retryCount>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ConvertAvroToJSON</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
<retry>false</retry>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
<retry>false</retry>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
</snippet>
<timestamp>08/19/2023 21:23:10 IST</timestamp>
</template>

 

avatar
Master Mentor

@sahil0915 

What you are proposing would require you to ingest into NiFi all ~100 million records from DC2, hash that record, write all ~100 million hashes to a map cache like Redis or HBase (which you would also need to install somewhere) using DistributedMapCache processor, then ingest all 100 million records from DC1, hash those records and finally compare the hash of those 100 million record with the hashes you added to the Distributed map cache using DetectDuplicate.    Any records routed to non-duplicate would represent what is not in DC2.   

Then you would have to flush your Distributed Map Cache and repeat process except this time writing the hashes from DC3 to the Distributed Map Cache.

I suspect this is going to perform poorly.  You would have NiFi ingesting ~300 million records just to create hash for a one time comparison.  

 

 

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

avatar

@sahil0915 It is not clear what you are asking for here.   

 

Using Nifi to do this replication, you would be well aware of any records that fail as that is the nature of nifi and how nifi works.  NiFi data flows capture failures so you could easily be aware of any records that did not make if from dc1 to dc2 and dc3.  Additionally, nifi handles retries, so a replication flow should be resilient to failures, and notify you at that time versus having to fully audit it after replication.

 

If you are using a database technology that replicates across regions or some other replication method and intend to use nifi to check if the replication is complete or accurate, you are going to need to make a nifi flow that will pull all 3 data sets and compare.   At the 100 million record row this could be a pretty heavy process with 3 copies of all data coming into NiFi.     It would make more sense to me to allow nifi to handle the replication as described above and take the inherit fault tolerance.

avatar
Explorer

I am not looking for replication from nifi to different data centers. I wanted to check missing records which are already replicated in data centers

Thanks

avatar

@sahil0915as @MattWho already pointed out, what you are trying to achieve is maybe not the best use case for NiFi. Nevertheless, if you still want to pursue this idea, prepare some resources because you might need them.

What you can try will be divided between two separate Flows: (PS: I did not test if everything works as expected, but this is what I would do)
- First one trying to identify if the number of rows from dc1 is equal to the number of rows in dc2 and dc3. You can do that by linking multiple processors: First things first, you will use an ExecuteSQLRecord to execute a "select count(*) from your_table". Next, linked to success you will have ExtractText Processor in which you will define a property named "dc_1_count" with the value "(.*)". This will save the value from the count into an attribute. Next, the success link will go into another ExecuteSQLRecord, which will execute an "select count(*) from your_table" but in dc2. From success, you will use another ExtractText and save the value into an attribute named dc_2_count. From ExtractText you go into another ExecuteSQLRecord where you will execute the same select but for dc3 and finally you will extract the value into dc_3_count with an extracttext.
- Next, you will create an RouteOnAttribute Processor and make some advanced settings and check: if dc_1_count>dc_2_count go in a specific queue, if dc_1_count>dc_3>count go in a specific queue and if dc_1_count=dc_2_count or dc_3_count go into success and end the flow.

Now, the second part is going to be the queues in case the count is not the same. If the count is not the same, you will have to execute san SQL Statement on dc2, which will extract all the rows from within your database. You are then going to split this file in several other files, each containing 1row per record. These new flow files will then go into an LookUpRecord processor, which is connected to the dc1 database and lookup the values present in the record. If the value is going to be there, you can discard that flowfile, otherwise, you can use an PutEmail or another processor to get alerted by this.

ATTENTION: doing this will require lots of resources (RAM, CPU, HEAP Memory) and the OS configuration should allow you to work with millions of open files per second. Otherwise, I do not recommend you to try this.

For testing purposes, you can use a table where you have like 10-20 rows and continue to bigger tables.