Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Who agreed with this solution

avatar
Master Guru

@Abhinav Joshi

As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.

Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.

Flow:-

68585-incremental-executesql-flow.png

Flow Explanation:-
1.GenerateFlowFile processor:-

This processor will be trigger for the flow and Added a new property
increment.value

cache_key

2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values

Cache Entry Identifier

${increment.value}

Distributed Cache Service

DistributedMapCacheClientService

Put Cache Value In Attribute

stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state

68586-fetchdistributedcache.png

3.UpdateAttribute:-

We have feeded both success/not-found relations to Update Attribute processor

Add two properties

current.state

${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute

stored.state

${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.

68588-updateattribute.png

You can change the default value, stored/current.state values as per your requirements.

4.ExecuteSql:-

SQL select query

select e.id,d.name from 
emp e join
dept d
on e.deptid=d.deptid
where 
 e.joindate > '${stored.state}' 
and 
 e.joindate > '${current.state}'

as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.

--
Fork the success relation from executesql processor

Fork1: to stored the current.state in distributecache

Fork2: For other processing

--
Fork1:To store the state:-

5.Replace Text:-

Search Value

(?s)(^.*$)

Replacement Value

${current.state}

Replacement Strategy

Always Replace

Evaluation Mode

Entire text

Now we are changing the flowfile content with current.state value as new flowfile content.

-
6.PutDistributeCacheMap:-

Cache Entry Identifier

${increment.value}

68589-putdistributecache.png

-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc

I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.

executesql-storestate-187764.xml

View solution in original post

Who agreed with this solution