Created on 04-21-2018 03:37 AM - edited 09-16-2022 06:07 AM
Hi all..i need to incrementally ingest data from rdbms using nifi.now i know i can use "query database table" processor which stores a state value on the basis of which incremental or streaming ingestion can be attempted. But my source query fetches data by joining 2 tables ..now in this case i cant use query database as here i can only use only one table and executesql processor doest have the provision to store state..any ideas on how this can be implemented
Created on 04-21-2018 05:54 PM - edited 08-17-2019 06:38 PM
As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.
Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.
Flow:-
Flow Explanation:-
1.GenerateFlowFile processor:-
This processor will be trigger for the flow and Added a new property
increment.value
cache_key
2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values
Cache Entry Identifier
${increment.value}
Distributed Cache Service
DistributedMapCacheClientService
Put Cache Value In Attribute
stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state
3.UpdateAttribute:-
We have feeded both success/not-found relations to Update Attribute processor
Add two properties
current.state
${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute
stored.state
${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.
You can change the default value, stored/current.state values as per your requirements.
4.ExecuteSql:-
SQL select query
select e.id,d.name from emp e join dept d on e.deptid=d.deptid where e.joindate > '${stored.state}' and e.joindate > '${current.state}'
as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.
--
Fork the success relation from executesql processor
Fork1: to stored the current.state in distributecache
Fork2: For other processing
--
Fork1:To store the state:-
5.Replace Text:-
Search Value
(?s)(^.*$)
Replacement Value
${current.state}
Replacement Strategy
Always Replace
Evaluation Mode
Entire text
Now we are changing the flowfile content with current.state value as new flowfile content.
-
6.PutDistributeCacheMap:-
Cache Entry Identifier
${increment.value}
-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc
I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.
Created 04-21-2018 11:50 AM
Created on 04-21-2018 05:54 PM - edited 08-17-2019 06:38 PM
As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.
Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.
Flow:-
Flow Explanation:-
1.GenerateFlowFile processor:-
This processor will be trigger for the flow and Added a new property
increment.value
cache_key
2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values
Cache Entry Identifier
${increment.value}
Distributed Cache Service
DistributedMapCacheClientService
Put Cache Value In Attribute
stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state
3.UpdateAttribute:-
We have feeded both success/not-found relations to Update Attribute processor
Add two properties
current.state
${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute
stored.state
${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.
You can change the default value, stored/current.state values as per your requirements.
4.ExecuteSql:-
SQL select query
select e.id,d.name from emp e join dept d on e.deptid=d.deptid where e.joindate > '${stored.state}' and e.joindate > '${current.state}'
as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.
--
Fork the success relation from executesql processor
Fork1: to stored the current.state in distributecache
Fork2: For other processing
--
Fork1:To store the state:-
5.Replace Text:-
Search Value
(?s)(^.*$)
Replacement Value
${current.state}
Replacement Strategy
Always Replace
Evaluation Mode
Entire text
Now we are changing the flowfile content with current.state value as new flowfile content.
-
6.PutDistributeCacheMap:-
Cache Entry Identifier
${increment.value}
-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc
I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.
Created 04-21-2018 09:21 PM
Hey @Shu thanks a million for taking the pain to provide such a great answer and too so detailed at every step. Really appreciate this !! .. great stuff ..thanks again
Created 04-27-2018 08:28 AM
Done @Shu . I have accepted 🙂
Created 01-22-2019 11:27 PM
@Shu @Matt Burgess I am using NiFi 1.1 and ifElse is not availabile in that. In NiFi 1.1, how can I evaluate stored.state in UpdateAttribute?
Created 01-23-2019 10:04 PM
@Shu ,
I am trying to upload the above template but I am getting below error
Error :
"Found bundle org.apache.nifi:nifi-update-attribute-nar:1.6.0 but does not support org.apache.nifi.processors.attributes.UpdateAttribute"
Could you please confirm if we need nifi-update-attribute-nar nar file?
In my requirement, I am joining 5 tables to retrieve incremental data based record_create_date
every second data is populated on these tables, I need to retrieve the data incrementally and flowfile should remember the last record_create_date it successfully pulled.
in the above example if I query
e.joindate >'${stored.state}'
and
e.joindate >'${current.state}' (it has current time), it will never fetch new records, right?
For distributed cache it is asking for Server Hostname and port, what should be the server for this?
Where I am setting the last fetched date (joindate) to ${stored.state}
Could you please clarify me on my doubt?
Thanks,
~Sri