Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Who agreed with this solution

avatar
Master Guru

@Abhinav Joshi

As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally.

Example:-
As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table.

Flow:-

68585-incremental-executesql-flow.png

Flow Explanation:-
1.GenerateFlowFile processor:-

This processor will be trigger for the flow and Added a new property
increment.value

cache_key

2.FetchDistributeMapCache:-
Configure and enable DistributedMapCacheClientService and change the below property values

Cache Entry Identifier

${increment.value}

Distributed Cache Service

DistributedMapCacheClientService

Put Cache Value In Attribute

stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state

68586-fetchdistributedcache.png

3.UpdateAttribute:-

We have feeded both success/not-found relations to Update Attribute processor

Add two properties

current.state

${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute

stored.state

${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.

68588-updateattribute.png

You can change the default value, stored/current.state values as per your requirements.

4.ExecuteSql:-

SQL select query

select e.id,d.name from 
emp e join
dept d
on e.deptid=d.deptid
where 
 e.joindate > '${stored.state}' 
and 
 e.joindate > '${current.state}'

as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally.

--
Fork the success relation from executesql processor

Fork1: to stored the current.state in distributecache

Fork2: For other processing

--
Fork1:To store the state:-

5.Replace Text:-

Search Value

(?s)(^.*$)

Replacement Value

${current.state}

Replacement Strategy

Always Replace

Evaluation Mode

Entire text

Now we are changing the flowfile content with current.state value as new flowfile content.

-
6.PutDistributeCacheMap:-

Cache Entry Identifier

${increment.value}

68589-putdistributecache.png

-
Fork2:-Further processing
like processing (or) storing the results to HDFS..etc

I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements.

executesql-storestate-187764.xml

View solution in original post

Who agreed with this solution