<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: CDC-like Nifi flow from SQL Server to Kafka in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/CDC-like-Nifi-flow-from-SQL-Server-to-Kafka/m-p/321358#M228372</link>
    <description>&lt;P&gt;You can have a QueryDataTableRecord to watch when changes happen and have that trigger your process.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You may want to try Debezium with Cloudera Kafka&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You may want to try Debezium with Cloudera Flink SQL&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://dev.to/tspannhw/simple-change-data-capture-cdc-with-sql-selects-via-apache-nifi-flank-19m4" target="_blank"&gt;https://dev.to/tspannhw/simple-change-data-capture-cdc-with-sql-selects-via-apache-nifi-flank-19m4&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;See:&amp;nbsp; &amp;nbsp;&lt;A href="https://github.com/tspannhw/EverythingApacheNiFi" target="_blank"&gt;https://github.com/tspannhw/EverythingApacheNiFi&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.microsoft.com/en-us/sql/database-engine/availability-groups/windows/replicate-track-change-data-capture-always-on-availability?view=sql-server-ver15" target="_blank"&gt;https://docs.microsoft.com/en-us/sql/database-engine/availability-groups/windows/replicate-track-change-data-capture-always-on-availability?view=sql-server-ver15&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://debezium.io/documentation/reference/connectors/sqlserver.html" target="_blank"&gt;https://debezium.io/documentation/reference/connectors/sqlserver.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://sandeepkattepogu.medium.com/streaming-data-from-microsoft-sql-server-into-apache-kafka-2fb53282115f" target="_blank"&gt;https://sandeepkattepogu.medium.com/streaming-data-from-microsoft-sql-server-into-apache-kafka-2fb53282115f&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.linkedin.com/pulse/achieving-incremental-fetch-change-data-capture-via-apache-rajpal/" target="_blank"&gt;https://www.linkedin.com/pulse/achieving-incremental-fetch-change-data-capture-via-apache-rajpal/&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.datainmotion.dev/2021/02/using-apache-nifi-in-openshift-and.html" target="_blank"&gt;https://www.datainmotion.dev/2021/02/using-apache-nifi-in-openshift-and.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 22 Jul 2021 17:00:06 GMT</pubDate>
    <dc:creator>TimothySpann</dc:creator>
    <dc:date>2021-07-22T17:00:06Z</dc:date>
    <item>
      <title>CDC-like Nifi flow from SQL Server to Kafka</title>
      <link>https://community.cloudera.com/t5/Support-Questions/CDC-like-Nifi-flow-from-SQL-Server-to-Kafka/m-p/320250#M228125</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;We are currently trying to fetch changes from SQL Server CDC tables and push them into a Kafka topic with Nifi.&lt;/P&gt;
&lt;P&gt;Basically the flow is working but we would like to have feedback on how to make it more reliable and optimized.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Here is the SQL used to fetch changes from MY_TABLE from offset 37979520&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;SELECT 
	CT.SYS_CHANGE_VERSION,
	CASE CT.SYS_CHANGE_OPERATION 
		WHEN 'D' THEN CT.T_PK 
		ELSE NULL 
	END as DeletedPK,
	CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
	T.*
FROM CHANGETABLE(CHANGES MY_TABLE, 37979520) CT 
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Here is the Nifi implementation:&lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="nifi.png" style="width: 999px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/31829i145AF390E873FDDC/image-size/large?v=v2&amp;amp;px=999" role="button" title="nifi.png" alt="nifi.png" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;(bigger picture in attachment)&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;To avoid threading issues, &lt;STRONG&gt;all processors are targeting the primary node with a single thread&lt;/STRONG&gt;.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Here are the steps&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;1 - &lt;U&gt;GenerateFlowFile&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;To start the process, I generate a FlowFile with a custom property &lt;STRONG&gt;"increment.value" = cache_key&lt;/STRONG&gt;.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;2 -&amp;nbsp;&lt;U&gt;FetchDistributedMapCache&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;I fetch the distributed cache with cache identifier &lt;STRONG&gt;$(increment.value}&lt;/STRONG&gt; and I put the result in the &lt;STRONG&gt;"stored.state"&lt;/STRONG&gt; attribute.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;3 - &lt;U&gt;UpdateAttribute&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;To manage initialization, I update "stored.state" with the following expression:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="javascript"&gt;${stored.state:isNull():ifElse(0, ${stored.state})}&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;=&amp;gt; If the cache is impty, I start from 0.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;4 - &lt;U&gt;ExecuteSQL&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;SQL statement to fetch data from the current offset&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;SELECT 
	CT.SYS_CHANGE_VERSION,
	CASE CT.SYS_CHANGE_OPERATION 
		WHEN 'D' THEN CT.T_PK 
		ELSE NULL 
	END as DeletedPK,
	CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
	T.*
FROM CHANGETABLE(CHANGES MY_TABLE, ${stored.state}) CT 
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;5 - QueryRecord&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;The &lt;STRONG&gt;"Include Zero Record FlowFiles" is set to false&lt;/STRONG&gt; to avoid dealing with incoming empty datasets.&lt;/P&gt;
&lt;P&gt;The relation&amp;nbsp;&lt;STRONG&gt;selectLastTransactionId&lt;/STRONG&gt; fetch the last offset with,&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;select max(LastTransactionId) as last_transaction_id from flowfile&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;The relation &lt;STRONG&gt;selectData&lt;/STRONG&gt; forward non-empty FlowFiles to Kafka&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;select * from flowfile&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;6A - &lt;U&gt;EvaluateJsonPath&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;I get &lt;STRONG&gt;$.[0].last_transaction_id&lt;/STRONG&gt; to forward the next offet to&amp;nbsp;&lt;STRONG&gt;PutDistributedMapCache&lt;/STRONG&gt; processor.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;6B -&amp;nbsp;&lt;U&gt;PublishKafkaRecord&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;7A -&amp;nbsp;&lt;U&gt;PutDistributedMapCache&lt;/U&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;I update the cache with the new offset.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;------------------------&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Here are some questions,&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;- Is there a better way to safely start the process ? A GenerateFlowFile with a long Timer Driver Run Schedule is not ideal.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;- Despite the (Include Zero Record FlowFiles = false) of the QueryRecord, I have some activity for empty datasets coming from the ExecuteSQL processor.&amp;nbsp;&lt;/P&gt;
&lt;P&gt;It triggers many exceptions at PutDistributedMapCache level,&lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="nifi-ex.png" style="width: 490px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/31831i95F5254241E23009/image-size/large?v=v2&amp;amp;px=999" role="button" title="nifi-ex.png" alt="nifi-ex.png" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;How is this possible ?&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;- How can we make this flow more reliable ? Is there a better way to orchestrate such a flow ?&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Many thanks !&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="nifi.png" style="width: 999px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/31832iBD0700762F490851/image-size/large?v=v2&amp;amp;px=999" role="button" title="nifi.png" alt="nifi.png" /&gt;&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 12 Jul 2021 18:25:04 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/CDC-like-Nifi-flow-from-SQL-Server-to-Kafka/m-p/320250#M228125</guid>
      <dc:creator>ExFabrica</dc:creator>
      <dc:date>2021-07-12T18:25:04Z</dc:date>
    </item>
    <item>
      <title>Re: CDC-like Nifi flow from SQL Server to Kafka</title>
      <link>https://community.cloudera.com/t5/Support-Questions/CDC-like-Nifi-flow-from-SQL-Server-to-Kafka/m-p/321358#M228372</link>
      <description>&lt;P&gt;You can have a QueryDataTableRecord to watch when changes happen and have that trigger your process.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You may want to try Debezium with Cloudera Kafka&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You may want to try Debezium with Cloudera Flink SQL&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://dev.to/tspannhw/simple-change-data-capture-cdc-with-sql-selects-via-apache-nifi-flank-19m4" target="_blank"&gt;https://dev.to/tspannhw/simple-change-data-capture-cdc-with-sql-selects-via-apache-nifi-flank-19m4&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;See:&amp;nbsp; &amp;nbsp;&lt;A href="https://github.com/tspannhw/EverythingApacheNiFi" target="_blank"&gt;https://github.com/tspannhw/EverythingApacheNiFi&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.microsoft.com/en-us/sql/database-engine/availability-groups/windows/replicate-track-change-data-capture-always-on-availability?view=sql-server-ver15" target="_blank"&gt;https://docs.microsoft.com/en-us/sql/database-engine/availability-groups/windows/replicate-track-change-data-capture-always-on-availability?view=sql-server-ver15&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://debezium.io/documentation/reference/connectors/sqlserver.html" target="_blank"&gt;https://debezium.io/documentation/reference/connectors/sqlserver.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://sandeepkattepogu.medium.com/streaming-data-from-microsoft-sql-server-into-apache-kafka-2fb53282115f" target="_blank"&gt;https://sandeepkattepogu.medium.com/streaming-data-from-microsoft-sql-server-into-apache-kafka-2fb53282115f&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.linkedin.com/pulse/achieving-incremental-fetch-change-data-capture-via-apache-rajpal/" target="_blank"&gt;https://www.linkedin.com/pulse/achieving-incremental-fetch-change-data-capture-via-apache-rajpal/&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.datainmotion.dev/2021/02/using-apache-nifi-in-openshift-and.html" target="_blank"&gt;https://www.datainmotion.dev/2021/02/using-apache-nifi-in-openshift-and.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 22 Jul 2021 17:00:06 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/CDC-like-Nifi-flow-from-SQL-Server-to-Kafka/m-p/321358#M228372</guid>
      <dc:creator>TimothySpann</dc:creator>
      <dc:date>2021-07-22T17:00:06Z</dc:date>
    </item>
  </channel>
</rss>

