Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

CDC-like Nifi flow from SQL Server to Kafka

avatar
New Contributor

Hello,

 

We are currently trying to fetch changes from SQL Server CDC tables and push them into a Kafka topic with Nifi.

Basically the flow is working but we would like to have feedback on how to make it more reliable and optimized.

 

Here is the SQL used to fetch changes from MY_TABLE from offset 37979520

 

 

SELECT 
	CT.SYS_CHANGE_VERSION,
	CASE CT.SYS_CHANGE_OPERATION 
		WHEN 'D' THEN CT.T_PK 
		ELSE NULL 
	END as DeletedPK,
	CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
	T.*
FROM CHANGETABLE(CHANGES MY_TABLE, 37979520) CT 
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK

 

 

Here is the Nifi implementation:

nifi.png

(bigger picture in attachment)

 

To avoid threading issues, all processors are targeting the primary node with a single thread.

 

Here are the steps

 

1 - GenerateFlowFile

To start the process, I generate a FlowFile with a custom property "increment.value" = cache_key.

 

2 - FetchDistributedMapCache

I fetch the distributed cache with cache identifier $(increment.value} and I put the result in the "stored.state" attribute.

 

3 - UpdateAttribute

To manage initialization, I update "stored.state" with the following expression:

 

${stored.state:isNull():ifElse(0, ${stored.state})}

 

=> If the cache is impty, I start from 0.

 

4 - ExecuteSQL

SQL statement to fetch data from the current offset

 

 

SELECT 
	CT.SYS_CHANGE_VERSION,
	CASE CT.SYS_CHANGE_OPERATION 
		WHEN 'D' THEN CT.T_PK 
		ELSE NULL 
	END as DeletedPK,
	CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
	T.*
FROM CHANGETABLE(CHANGES MY_TABLE, ${stored.state}) CT 
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK

 

 

5 - QueryRecord

 

The "Include Zero Record FlowFiles" is set to false to avoid dealing with incoming empty datasets.

The relation selectLastTransactionId fetch the last offset with,

 

select max(LastTransactionId) as last_transaction_id from flowfile

 

The relation selectData forward non-empty FlowFiles to Kafka

 

select * from flowfile

 

 

6A - EvaluateJsonPath

 

I get $.[0].last_transaction_id to forward the next offet to PutDistributedMapCache processor.

 

6B - PublishKafkaRecord

 

7A - PutDistributedMapCache

 

I update the cache with the new offset.

 

------------------------

 

Here are some questions,

 

- Is there a better way to safely start the process ? A GenerateFlowFile with a long Timer Driver Run Schedule is not ideal.

 

- Despite the (Include Zero Record FlowFiles = false) of the QueryRecord, I have some activity for empty datasets coming from the ExecuteSQL processor. 

It triggers many exceptions at PutDistributedMapCache level,

nifi-ex.png

How is this possible ?

 

- How can we make this flow more reliable ? Is there a better way to orchestrate such a flow ?

 

Many thanks !


nifi.png

1 REPLY 1

avatar
Master Guru