Member since 
    
	
		
		
		07-09-2021
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                1
            
            
                Post
            
        
                0
            
            
                Kudos Received
            
        
                0
            
            
                Solutions
            
        
			
    
	
		
		
		07-09-2021
	
		
		05:48 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hello, 
   
 We are currently trying to fetch changes from SQL Server CDC tables and push them into a Kafka topic with Nifi. 
 Basically the flow is working but we would like to have feedback on how to make it more reliable and optimized. 
   
 Here is the SQL used to fetch changes from MY_TABLE from offset 37979520 
   
   
 SELECT 
	CT.SYS_CHANGE_VERSION,
	CASE CT.SYS_CHANGE_OPERATION 
		WHEN 'D' THEN CT.T_PK 
		ELSE NULL 
	END as DeletedPK,
	CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
	T.*
FROM CHANGETABLE(CHANGES MY_TABLE, 37979520) CT 
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK 
   
   
 Here is the Nifi implementation: 
    
 (bigger picture in attachment) 
   
 To avoid threading issues, all processors are targeting the primary node with a single thread. 
   
 Here are the steps 
   
 1 - GenerateFlowFile 
 To start the process, I generate a FlowFile with a custom property "increment.value" = cache_key. 
   
 2 - FetchDistributedMapCache 
 I fetch the distributed cache with cache identifier $(increment.value} and I put the result in the "stored.state" attribute. 
   
 3 - UpdateAttribute 
 To manage initialization, I update "stored.state" with the following expression: 
   
 ${stored.state:isNull():ifElse(0, ${stored.state})} 
   
 => If the cache is impty, I start from 0. 
   
 4 - ExecuteSQL 
 SQL statement to fetch data from the current offset 
   
   
 SELECT 
	CT.SYS_CHANGE_VERSION,
	CASE CT.SYS_CHANGE_OPERATION 
		WHEN 'D' THEN CT.T_PK 
		ELSE NULL 
	END as DeletedPK,
	CHANGE_TRACKING_CURRENT_VERSION() as LastTransactionId,
	T.*
FROM CHANGETABLE(CHANGES MY_TABLE, ${stored.state}) CT 
LEFT JOIN MY_TABLE T on CT.T_PK = T.T_PK 
   
   
 5 - QueryRecord 
   
 The "Include Zero Record FlowFiles" is set to false to avoid dealing with incoming empty datasets. 
 The relation selectLastTransactionId fetch the last offset with, 
   
 select max(LastTransactionId) as last_transaction_id from flowfile 
   
 The relation selectData forward non-empty FlowFiles to Kafka 
   
 select * from flowfile 
   
   
 6A - EvaluateJsonPath 
   
 I get $.[0].last_transaction_id to forward the next offet to PutDistributedMapCache processor. 
   
 6B - PublishKafkaRecord 
   
 7A - PutDistributedMapCache 
   
 I update the cache with the new offset. 
   
 ------------------------ 
   
 Here are some questions, 
   
 - Is there a better way to safely start the process ? A GenerateFlowFile with a long Timer Driver Run Schedule is not ideal. 
   
 - Despite the (Include Zero Record FlowFiles = false) of the QueryRecord, I have some activity for empty datasets coming from the ExecuteSQL processor.  
 It triggers many exceptions at PutDistributedMapCache level, 
    
 How is this possible ? 
   
 - How can we make this flow more reliable ? Is there a better way to orchestrate such a flow ? 
   
 Many thanks ! 
      
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache Kafka
- 
						
							
		
			Apache NiFi
 
        
