Hello,
We are currently trying to fetch changes from SQL Server CDC tables and push them into a Kafka topic with Nifi.
Basically the flow is working but we would like to have feedback on how to make it more reliable and optimized.
Here is the SQL used to fetch changes from MY_TABLE from offset 37979520
SELECT
CT.SYS_CHANGE_VERSION,
CASE CT.SYS_CHANGE_OPERATION
WHEN 'D' THEN CT.T_PK
ELSE NULL
END as DeletedPK,
CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
T.*
FROM CHANGETABLE(CHANGES MY_TABLE, 37979520) CT
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK
Here is the Nifi implementation:
(bigger picture in attachment)
To avoid threading issues, all processors are targeting the primary node with a single thread.
Here are the steps
1 - GenerateFlowFile
To start the process, I generate a FlowFile with a custom property "increment.value" = cache_key.
2 - FetchDistributedMapCache
I fetch the distributed cache with cache identifier $(increment.value} and I put the result in the "stored.state" attribute.
3 - UpdateAttribute
To manage initialization, I update "stored.state" with the following expression:
${stored.state:isNull():ifElse(0, ${stored.state})}
=> If the cache is impty, I start from 0.
4 - ExecuteSQL
SQL statement to fetch data from the current offset
SELECT
CT.SYS_CHANGE_VERSION,
CASE CT.SYS_CHANGE_OPERATION
WHEN 'D' THEN CT.T_PK
ELSE NULL
END as DeletedPK,
CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
T.*
FROM CHANGETABLE(CHANGES MY_TABLE, ${stored.state}) CT
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK
5 - QueryRecord
The "Include Zero Record FlowFiles" is set to false to avoid dealing with incoming empty datasets.
The relation selectLastTransactionId fetch the last offset with,
select max(LastTransactionId) as last_transaction_id from flowfile
The relation selectData forward non-empty FlowFiles to Kafka
select * from flowfile
6A - EvaluateJsonPath
I get $.[0].last_transaction_id to forward the next offet to PutDistributedMapCache processor.
6B - PublishKafkaRecord
7A - PutDistributedMapCache
I update the cache with the new offset.
------------------------
Here are some questions,
- Is there a better way to safely start the process ? A GenerateFlowFile with a long Timer Driver Run Schedule is not ideal.
- Despite the (Include Zero Record FlowFiles = false) of the QueryRecord, I have some activity for empty datasets coming from the ExecuteSQL processor.
It triggers many exceptions at PutDistributedMapCache level,
How is this possible ?
- How can we make this flow more reliable ? Is there a better way to orchestrate such a flow ?
Many thanks !
Created 07-22-2021 10:00 AM
You can have a QueryDataTableRecord to watch when changes happen and have that trigger your process.
You may want to try Debezium with Cloudera Kafka
You may want to try Debezium with Cloudera Flink SQL
https://dev.to/tspannhw/simple-change-data-capture-cdc-with-sql-selects-via-apache-nifi-flank-19m4
See: https://github.com/tspannhw/EverythingApacheNiFi
https://debezium.io/documentation/reference/connectors/sqlserver.html
https://www.linkedin.com/pulse/achieving-incremental-fetch-change-data-capture-via-apache-rajpal/
https://www.datainmotion.dev/2021/02/using-apache-nifi-in-openshift-and.html