Support Questions

Find answers, ask questions, and share your expertise

Inserting data (daily) of a specific date in Cassandra via Apache NIFI

avatar
Contributor

Hey,

I want to import data of an ftp (which is updated daily) to cassandra by using Apache NIFI.

I got the process so far, but i need to update my database every day with data of the day before.

It would work when I m uploading the whole file every day, but I want to have just the new lines of this specific day(the day before) as a flow file.

For example:

Todays date is 2018-01-25 and the data on my ftp looks something like this:

date(timestamp) abc ghi xyz
2018-01-23 11:00 Null 222 44
2018-01-23 12:00 Null 222 44
2018-01-23 13:00 Null 222 44
2018-01-23 14:00 Null 222 44
2018-01-23 15:00 Null 222 44
2018-01-24 11:00 Null 222 44
2018-01-24 12:00 Null 222 44
2018-01-24 13:00 Null 222 44
2018-01-24 14:00 Null 222 44
2018-01-24 15:00 Null 222 44

Before inserting it to cassandra I convert it from csv to json, so it looks like this:
(date : YYYYmmDDhh)

[{"xyz":"44","date":"2018012311","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012312","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012313","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012314","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012315","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012411","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012412","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012413","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012414","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012415","abc":"Null","ghi":"222"}]

I want to replace my flowfile/extract now all data of date "20180124" for my following processors.

I thought about using a possibility to get the date one day ago by using the nifi express language: "${now():toNumber():minus(86400000):format('yyyyMMdd')}" - this shows me the date of yesterday.
But where do I have to set it up. What processor is recommended?

Is it maybe possible as a regex in a Replace Text processor? Or an Extract Text Processor?
Something like:
Search value: ^.*(\{${now():toNumber():minus(86400000):format('yyyy-MM-dd')\}$ - I guess thats not how regex works though.

My process looks like:

ListFTP -> FetchFTP -> ConvertRecord (CSV to JSON) -> ????? -> SplitJSON -> EvaluateJSONPath/UpdateAttribute -> ReplaceText (insert into) -> PUTCassandraQL


Any help is much appreciated!

1 ACCEPTED SOLUTION

avatar
Master Guru
@Salda Murrah
ListFTP -> FetchFTP -> ConvertRecord (CSV to JSON)
-> SplitJSON //split the json array to individual messages
-> EvaluateJSONPath //extract date value and keep it as attribute
-> RouteonAttribute //check date attribute value is it contains last day date or not
-> ReplaceText (insert into) -> PUTCassandraQL

SplitJson Configs:-

JsonPath Expression

$.*

56456-splitjson.png

EvaluateJsonPath Configs:-

Destination

flowfile-attribute

date

$.date

56457-eval-json.png

if you want you can add more properties and all those values will be added as flowfile attributes.

Routeonattribute Configs:-

Routing Strategy

Route to Property name

yesterday

${date:contains("${now():toNumber():minus(86400000):format('yyyyMMdd')}")}

56458-routeonattribute.png

Connect Yesterday relation to next Replace text processor so route on attribute processor only gives date attribute that having yesterday's date in it(we are comparing by using contain function i.e expression checks if the date attribute contains 20180124 or not).

Then in replace text processor prepare your insert into statement then use PUTCassandraQL processor.

.

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.

View solution in original post

2 REPLIES 2

avatar
Master Guru
@Salda Murrah
ListFTP -> FetchFTP -> ConvertRecord (CSV to JSON)
-> SplitJSON //split the json array to individual messages
-> EvaluateJSONPath //extract date value and keep it as attribute
-> RouteonAttribute //check date attribute value is it contains last day date or not
-> ReplaceText (insert into) -> PUTCassandraQL

SplitJson Configs:-

JsonPath Expression

$.*

56456-splitjson.png

EvaluateJsonPath Configs:-

Destination

flowfile-attribute

date

$.date

56457-eval-json.png

if you want you can add more properties and all those values will be added as flowfile attributes.

Routeonattribute Configs:-

Routing Strategy

Route to Property name

yesterday

${date:contains("${now():toNumber():minus(86400000):format('yyyyMMdd')}")}

56458-routeonattribute.png

Connect Yesterday relation to next Replace text processor so route on attribute processor only gives date attribute that having yesterday's date in it(we are comparing by using contain function i.e expression checks if the date attribute contains 20180124 or not).

Then in replace text processor prepare your insert into statement then use PUTCassandraQL processor.

.

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.

avatar
Contributor

Thats works very well, thank you once again.