Support Questions

Find answers, ask questions, and share your expertise

Can executesql processor store state for incremental loading

avatar
Expert Contributor

Hi all..i need to incrementally ingest data from rdbms using nifi.now i know i can use "query database table" processor which stores a state value on the basis of which incremental or streaming ingestion can be attempted. But my source query fetches data by joining 2 tables ..now in this case i cant use query database as here i can only use only one table and executesql processor doest have the provision to store state..any ideas on how this can be implemented

1 ACCEPTED SOLUTION

avatar
Master Guru

@Abhinav Joshi

As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.

Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.

Flow:-

68585-incremental-executesql-flow.png

Flow Explanation:-
1.GenerateFlowFile processor:-

This processor will be trigger for the flow and Added a new property
increment.value

cache_key

2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values

Cache Entry Identifier

${increment.value}

Distributed Cache Service

DistributedMapCacheClientService

Put Cache Value In Attribute

stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state

68586-fetchdistributedcache.png

3.UpdateAttribute:-

We have feeded both success/not-found relations to Update Attribute processor

Add two properties

current.state

${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute

stored.state

${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.

68588-updateattribute.png

You can change the default value, stored/current.state values as per your requirements.

4.ExecuteSql:-

SQL select query

select e.id,d.name from 
emp e join
dept d
on e.deptid=d.deptid
where 
 e.joindate > '${stored.state}' 
and 
 e.joindate > '${current.state}'

as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.

--
Fork the success relation from executesql processor

Fork1: to stored the current.state in distributecache

Fork2: For other processing

--
Fork1:To store the state:-

5.Replace Text:-

Search Value

(?s)(^.*$)

Replacement Value

${current.state}

Replacement Strategy

Always Replace

Evaluation Mode

Entire text

Now we are changing the flowfile content with current.state value as new flowfile content.

-
6.PutDistributeCacheMap:-

Cache Entry Identifier

${increment.value}

68589-putdistributecache.png

-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc

I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.

executesql-storestate-187764.xml

View solution in original post

6 REPLIES 6

avatar
New Contributor

I can only use only one table and execute processor does have the provision to store state..any ideas on how this can be implemented

avatar
Master Guru

@Abhinav Joshi

As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.

Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.

Flow:-

68585-incremental-executesql-flow.png

Flow Explanation:-
1.GenerateFlowFile processor:-

This processor will be trigger for the flow and Added a new property
increment.value

cache_key

2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values

Cache Entry Identifier

${increment.value}

Distributed Cache Service

DistributedMapCacheClientService

Put Cache Value In Attribute

stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state

68586-fetchdistributedcache.png

3.UpdateAttribute:-

We have feeded both success/not-found relations to Update Attribute processor

Add two properties

current.state

${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute

stored.state

${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.

68588-updateattribute.png

You can change the default value, stored/current.state values as per your requirements.

4.ExecuteSql:-

SQL select query

select e.id,d.name from 
emp e join
dept d
on e.deptid=d.deptid
where 
 e.joindate > '${stored.state}' 
and 
 e.joindate > '${current.state}'

as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.

--
Fork the success relation from executesql processor

Fork1: to stored the current.state in distributecache

Fork2: For other processing

--
Fork1:To store the state:-

5.Replace Text:-

Search Value

(?s)(^.*$)

Replacement Value

${current.state}

Replacement Strategy

Always Replace

Evaluation Mode

Entire text

Now we are changing the flowfile content with current.state value as new flowfile content.

-
6.PutDistributeCacheMap:-

Cache Entry Identifier

${increment.value}

68589-putdistributecache.png

-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc

I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.

executesql-storestate-187764.xml

avatar
Expert Contributor

Hey @Shu thanks a million for taking the pain to provide such a great answer and too so detailed at every step. Really appreciate this !! .. great stuff ..thanks again

avatar
Expert Contributor

Done @Shu . I have accepted 🙂

avatar

@Shu @Matt Burgess I am using NiFi 1.1 and ifElse is not availabile in that. In NiFi 1.1, how can I evaluate stored.state in UpdateAttribute?

avatar

@Shu ,

I am trying to upload the above template but I am getting below error

Error :

"Found bundle org.apache.nifi:nifi-update-attribute-nar:1.6.0 but does not support org.apache.nifi.processors.attributes.UpdateAttribute"

Could you please confirm if we need nifi-update-attribute-nar nar file?

In my requirement, I am joining 5 tables to retrieve incremental data based record_create_date

every second data is populated on these tables, I need to retrieve the data incrementally and flowfile should remember the last record_create_date it successfully pulled.

in the above example if I query

e.joindate >'${stored.state}'

and

e.joindate >'${current.state}' (it has current time), it will never fetch new records, right?

For distributed cache it is asking for Server Hostname and port, what should be the server for this?

Where I am setting the last fetched date (joindate) to ${stored.state}

Could you please clarify me on my doubt?

Thanks,

~Sri