Created 01-25-2018 10:20 AM
Hey,
I want to import data of an ftp (which is updated daily) to cassandra by using Apache NIFI.
I got the process so far, but i need to update my database every day with data of the day before.
It would work when I m uploading the whole file every day, but I want to have just the new lines of this specific day(the day before) as a flow file.
For example:
Todays date is 2018-01-25 and the data on my ftp looks something like this:
date(timestamp) abc ghi xyz
2018-01-23 11:00 Null 222 44
2018-01-23 12:00 Null 222 44
2018-01-23 13:00 Null 222 44
2018-01-23 14:00 Null 222 44
2018-01-23 15:00 Null 222 44
2018-01-24 11:00 Null 222 44
2018-01-24 12:00 Null 222 44
2018-01-24 13:00 Null 222 44
2018-01-24 14:00 Null 222 44
2018-01-24 15:00 Null 222 44
Before inserting it to cassandra I convert it from csv to json, so it looks like this:
(date : YYYYmmDDhh)
[{"xyz":"44","date":"2018012311","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012312","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012313","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012314","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012315","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012411","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012412","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012413","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012414","abc":"Null","ghi":"222"},
{"xyz":"44","date":"2018012415","abc":"Null","ghi":"222"}]
I want to replace my flowfile/extract now all data of date "20180124" for my following processors.
I thought about using a possibility to get the date one day ago by using the nifi express language: "${now():toNumber():minus(86400000):format('yyyyMMdd')}" - this shows me the date of yesterday.
But where do I have to set it up. What processor is recommended?
Is it maybe possible as a regex in a Replace Text processor? Or an Extract Text Processor?
Something like:
Search value: ^.*(\{${now():toNumber():minus(86400000):format('yyyy-MM-dd')\}$ - I guess thats not how regex works though.
My process looks like:
ListFTP -> FetchFTP -> ConvertRecord (CSV to JSON) -> ????? -> SplitJSON -> EvaluateJSONPath/UpdateAttribute -> ReplaceText (insert into) -> PUTCassandraQL
Any help is much appreciated!
Created on 01-25-2018 02:32 PM - edited 08-17-2019 11:21 PM
ListFTP -> FetchFTP -> ConvertRecord (CSV to JSON) -> SplitJSON //split the json array to individual messages -> EvaluateJSONPath //extract date value and keep it as attribute -> RouteonAttribute //check date attribute value is it contains last day date or not -> ReplaceText (insert into) -> PUTCassandraQL
SplitJson Configs:-
JsonPath Expression
$.*
EvaluateJsonPath Configs:-
Destination
flowfile-attribute
date
$.date
if you want you can add more properties and all those values will be added as flowfile attributes.
Routeonattribute Configs:-
Routing Strategy
Route to Property name
yesterday
${date:contains("${now():toNumber():minus(86400000):format('yyyyMMdd')}")}
Connect Yesterday relation to next Replace text processor so route on attribute processor only gives date attribute that having yesterday's date in it(we are comparing by using contain function i.e expression checks if the date attribute contains 20180124 or not).
Then in replace text processor prepare your insert into statement then use PUTCassandraQL processor.
.
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
Created on 01-25-2018 02:32 PM - edited 08-17-2019 11:21 PM
ListFTP -> FetchFTP -> ConvertRecord (CSV to JSON) -> SplitJSON //split the json array to individual messages -> EvaluateJSONPath //extract date value and keep it as attribute -> RouteonAttribute //check date attribute value is it contains last day date or not -> ReplaceText (insert into) -> PUTCassandraQL
SplitJson Configs:-
JsonPath Expression
$.*
EvaluateJsonPath Configs:-
Destination
flowfile-attribute
date
$.date
if you want you can add more properties and all those values will be added as flowfile attributes.
Routeonattribute Configs:-
Routing Strategy
Route to Property name
yesterday
${date:contains("${now():toNumber():minus(86400000):format('yyyyMMdd')}")}
Connect Yesterday relation to next Replace text processor so route on attribute processor only gives date attribute that having yesterday's date in it(we are comparing by using contain function i.e expression checks if the date attribute contains 20180124 or not).
Then in replace text processor prepare your insert into statement then use PUTCassandraQL processor.
.
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
Created 01-25-2018 04:07 PM
Thats works very well, thank you once again.