Created 11-07-2017 10:33 AM
Hi folks,
I have a dataflow, where I convert a textfile with csv structure into a Json record.
After the file is converted Im actually splitting the output flowfile at $.*
And after that Im using the evaluateJsonPath processor to get any attributes to work with them.
So the dataflow looks like this:
getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> SplitJson -> EvaluateJsonPath ->....
So my problem is:
I just want exactly the last entry in this csv file or respectively the last json record to work with.
I searched already in google and I found as solution approaches to use the ${record.count} attribute (by convertRecord) or ${fragment.count} (by SplitJson) attribute to get the index of the last line/record.
But I dont know how/where to use these attributes get just the last entry of my text file/json record.
Maybe I need to route this with RouteOnAttribute processor but I dont know how the property have to look like.
I hope my Question/Problem is clear and you can handle my broken english.
Thx
Created on 11-07-2017 01:33 PM - edited 08-18-2019 12:33 AM
Before splitjson processor use an ExtractText processor and add new property to the processor as
last_line as
^.*\,(\{.*)]$
Example:-
As you are converting csv to json and then using split json processor to split json array into individual messages.
Lets consider your json array is like
[{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}]
you are interested in only
{"id":3,"name":"bar"} //which is last record in the array
Use extract text processor before split json and add new property as last_line
Extract Text processor Configs:-
in this processor you can see Max Capture Group Length property which having default value as 1024(i.e 1 kb) //it only captures 1 kb length of message. if your last record (or) message having more than 1 kb size then increase the size as you can see above i have given 4000(i.e almost 4 kb).
Max Buffer Size default value is 1 MB //Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. you need to configure this property as per your flow file size.
Output of this processor will add last_line attribute to the flow file so that you can use that attribute by using ${last_line}
Created 11-07-2017 11:15 AM
I think you can explore the "TailFile" and "ExecuteStreamCommand" , I have not tested it , but it should work
(tail -n 1)
.
Created on 11-07-2017 12:39 PM - edited 08-18-2019 12:33 AM
I tried the "ExecuteProcess" processor to tail the last line of the file "/tmp/test.txt" (tail -1 /tmp/test.txt) and putting it to HDFS. With the following properties for the ExecuteProcess, Schedule interval 60 Seconds.
Command: /usr/bin/tail Command Arguments: -1;/tmp/test.txt Argument Delimiter: ;
.
.
Created 11-07-2017 06:26 PM
I tried this solution, but it didnt worked for me. But thanks anyway
Created on 11-07-2017 01:33 PM - edited 08-18-2019 12:33 AM
Before splitjson processor use an ExtractText processor and add new property to the processor as
last_line as
^.*\,(\{.*)]$
Example:-
As you are converting csv to json and then using split json processor to split json array into individual messages.
Lets consider your json array is like
[{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}]
you are interested in only
{"id":3,"name":"bar"} //which is last record in the array
Use extract text processor before split json and add new property as last_line
Extract Text processor Configs:-
in this processor you can see Max Capture Group Length property which having default value as 1024(i.e 1 kb) //it only captures 1 kb length of message. if your last record (or) message having more than 1 kb size then increase the size as you can see above i have given 4000(i.e almost 4 kb).
Max Buffer Size default value is 1 MB //Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. you need to configure this property as per your flow file size.
Output of this processor will add last_line attribute to the flow file so that you can use that attribute by using ${last_line}
Created 11-07-2017 02:13 PM
Hi @Shu thx for your help. My next question is how can I get the values from "id" and "name" (in your example) to work with them as attribute, like I did before with EvaluateJsonPath. Is it possible to transform the "last_line" attribute into a flowfile, so I could work with it in EvaluateJsonPath? Or do you have another solution?
Created 11-07-2017 02:39 PM
My goal is to save these values in cassandra, so i could make a cql statement like: insert into tableA (id, name) values ('${id}', '${name}')
Created on 11-07-2017 04:01 PM - edited 08-18-2019 12:33 AM
If you want to extract those values then you can do that in 2 ways
Don't want to change the flowfile content method:-
after Extract text processor use
update attribute processor:-
we are going to prepare id,name attributes in this processor
add new properties to the update attribute processor
1. using replaceall function
id as
${last_line:replaceAll('.*"id":(.*)(?:,).*','$1')} //extract value after "id": and replace whole last_line attribute value with the value $1(first capturing group)
name property as
${last_line:replaceAll('.*"name":"(.*\d)(?:").*','$1')} ////extract value after "name": and replace whole last_line attribute value with the value $1(first capturing group)
Update Attribute Configs:-
2.Using JsonPath function:-
instead of replace all function as last_line attribute having json message so we can use jsonPath function by using nifi expression language
id as
${last_line:jsonPath('$.id')}
name as
${last_line:jsonPath('$.name')}
Configs:-
Both these ReplaceAll and JsonPath functions will result as same it's better and easy to use JsonPath function instead of replace all.
After this processor we will have id,name attr associated with the flowfile so that you can use them in your insert statement
insert into tableA (id, name) values ('${id}', '${name}')
Flow:-
getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> Extract Text -> UpdateAttribute -> SplitJson -> EvaluateJsonPath ->....->insert statement
Method2:- Changing contents of the flowfile:-
Replace Text Processor instead of Extract Text processor //replace text processor will change the flowfile contents
here in this processor we are going to change the whole content of flow file(i.e whole array to last record)
Example:-
[{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}] //input flowfile content
output:-
{"id":3,"name":"bar"} //output flowfile content
Then use Evaluate json path processor to extract contents of the flowfile
Your goal is to prepare insert statement so keep Destination as flowfile-attribute in this processor.
Output:-
Flow:-
getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> ReplaceText --> Evaluatejson -> insert to cassandra.
Both these methods will add id,name attributes to the flowfile
Method1:- we are not going to change the contents of the flowfile
Method2:- we are going to change the contents of the flowfile. //replacing whole content with last record
As you can decide which one is better fits for you use case.
Created 11-07-2017 06:25 PM
@Shu thank you, I used the 2nd method and it worked like charme 🙂 Your Answers are really nice detailed and easy to follow.
Created 10-23-2018 01:24 PM
Did you solve this problem?
I have a very simple way.
You can use JSONPath expression like "$[-1].id" in EvaluateJsonPath.
"-1" means last index of attribute.