Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Please see the Cloudera blog for information on the Cloudera Response to CVE-2021-4428

Can executesql processor store state for incremental loading

Expert Contributor

Hi all..i need to incrementally ingest data from rdbms using nifi.now i know i can use "query database table" processor which stores a state value on the basis of which incremental or streaming ingestion can be attempted. But my source query fetches data by joining 2 tables ..now in this case i cant use query database as here i can only use only one table and executesql processor doest have the provision to store state..any ideas on how this can be implemented

1 ACCEPTED SOLUTION

Accepted Solutions

Super Guru

@Abhinav Joshi

As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.

Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.

Flow:-

68585-incremental-executesql-flow.png

Flow Explanation:-
1.GenerateFlowFile processor:-

This processor will be trigger for the flow and Added a new property
increment.value

cache_key

2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values

Cache Entry Identifier

${increment.value}

Distributed Cache Service

DistributedMapCacheClientService

Put Cache Value In Attribute

stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state

68586-fetchdistributedcache.png

3.UpdateAttribute:-

We have feeded both success/not-found relations to Update Attribute processor

Add two properties

current.state

${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute

stored.state

${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.

68588-updateattribute.png

You can change the default value, stored/current.state values as per your requirements.

4.ExecuteSql:-

SQL select query

select e.id,d.name from 
emp e join
dept d
on e.deptid=d.deptid
where 
 e.joindate > '${stored.state}' 
and 
 e.joindate > '${current.state}'

as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.

--
Fork the success relation from executesql processor

Fork1: to stored the current.state in distributecache

Fork2: For other processing

--
Fork1:To store the state:-

5.Replace Text:-

Search Value

(?s)(^.*$)

Replacement Value

${current.state}

Replacement Strategy

Always Replace

Evaluation Mode

Entire text

Now we are changing the flowfile content with current.state value as new flowfile content.

-
6.PutDistributeCacheMap:-

Cache Entry Identifier

${increment.value}

68589-putdistributecache.png

-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc

I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.

executesql-storestate-187764.xml

View solution in original post

6 REPLIES 6

New Contributor

I can only use only one table and execute processor does have the provision to store state..any ideas on how this can be implemented

Super Guru

@Abhinav Joshi

As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.

Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.

Flow:-

68585-incremental-executesql-flow.png

Flow Explanation:-
1.GenerateFlowFile processor:-

This processor will be trigger for the flow and Added a new property
increment.value

cache_key

2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values

Cache Entry Identifier

${increment.value}

Distributed Cache Service

DistributedMapCacheClientService

Put Cache Value In Attribute

stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state

68586-fetchdistributedcache.png

3.UpdateAttribute:-

We have feeded both success/not-found relations to Update Attribute processor

Add two properties

current.state

${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute

stored.state

${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.

68588-updateattribute.png

You can change the default value, stored/current.state values as per your requirements.

4.ExecuteSql:-

SQL select query

select e.id,d.name from 
emp e join
dept d
on e.deptid=d.deptid
where 
 e.joindate > '${stored.state}' 
and 
 e.joindate > '${current.state}'

as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.

--
Fork the success relation from executesql processor

Fork1: to stored the current.state in distributecache

Fork2: For other processing

--
Fork1:To store the state:-

5.Replace Text:-

Search Value

(?s)(^.*$)

Replacement Value

${current.state}

Replacement Strategy

Always Replace

Evaluation Mode

Entire text

Now we are changing the flowfile content with current.state value as new flowfile content.

-
6.PutDistributeCacheMap:-

Cache Entry Identifier

${increment.value}

68589-putdistributecache.png

-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc

I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.

executesql-storestate-187764.xml

View solution in original post

Expert Contributor

Hey @Shu thanks a million for taking the pain to provide such a great answer and too so detailed at every step. Really appreciate this !! .. great stuff ..thanks again

Expert Contributor

Done @Shu . I have accepted 🙂

New Contributor

@Shu @Matt Burgess I am using NiFi 1.1 and ifElse is not availabile in that. In NiFi 1.1, how can I evaluate stored.state in UpdateAttribute?

@Shu ,

I am trying to upload the above template but I am getting below error

Error :

"Found bundle org.apache.nifi:nifi-update-attribute-nar:1.6.0 but does not support org.apache.nifi.processors.attributes.UpdateAttribute"

Could you please confirm if we need nifi-update-attribute-nar nar file?

In my requirement, I am joining 5 tables to retrieve incremental data based record_create_date

every second data is populated on these tables, I need to retrieve the data incrementally and flowfile should remember the last record_create_date it successfully pulled.

in the above example if I query

e.joindate >'${stored.state}'

and

e.joindate >'${current.state}' (it has current time), it will never fetch new records, right?

For distributed cache it is asking for Server Hostname and port, what should be the server for this?

Where I am setting the last fetched date (joindate) to ${stored.state}

Could you please clarify me on my doubt?

Thanks,

~Sri