Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

[APACHE NIFI] How to route just the last line or record of a textfile/json file

avatar
Contributor

Hi folks,

I have a dataflow, where I convert a textfile with csv structure into a Json record.

After the file is converted Im actually splitting the output flowfile at $.*

And after that Im using the evaluateJsonPath processor to get any attributes to work with them.

So the dataflow looks like this:

getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> SplitJson -> EvaluateJsonPath ->....

So my problem is:

I just want exactly the last entry in this csv file or respectively the last json record to work with.

I searched already in google and I found as solution approaches to use the ${record.count} attribute (by convertRecord) or ${fragment.count} (by SplitJson) attribute to get the index of the last line/record.

But I dont know how/where to use these attributes get just the last entry of my text file/json record.

Maybe I need to route this with RouteOnAttribute processor but I dont know how the property have to look like.

I hope my Question/Problem is clear and you can handle my broken english.

Thx

1 ACCEPTED SOLUTION

avatar
Master Guru
@Andy Gisbo

Before splitjson processor use an ExtractText processor and add new property to the processor as

last_line as

^.*\,(\{.*)]$

Example:-

As you are converting csv to json and then using split json processor to split json array into individual messages.

Lets consider your json array is like

[{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}]

you are interested in only

{"id":3,"name":"bar"} //which is last record in the array

Use extract text processor before split json and add new property as last_line

Extract Text processor Configs:-

42485-extract.png

in this processor you can see Max Capture Group Length property which having default value as 1024(i.e 1 kb) //it only captures 1 kb length of message. if your last record (or) message having more than 1 kb size then increase the size as you can see above i have given 4000(i.e almost 4 kb).

Max Buffer Size default value is 1 MB //Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. you need to configure this property as per your flow file size.

Output of this processor will add last_line attribute to the flow file so that you can use that attribute by using ${last_line}

42486-last-line-attr.png

View solution in original post

9 REPLIES 9

avatar
Master Mentor

@Andy Gisbo

I think you can explore the "TailFile" and "ExecuteStreamCommand" , I have not tested it , but it should work

(tail -n 1)

.

avatar
Master Mentor

@Andy Gisbo

I tried the "ExecuteProcess" processor to tail the last line of the file "/tmp/test.txt" (tail -1 /tmp/test.txt) and putting it to HDFS. With the following properties for the ExecuteProcess, Schedule interval 60 Seconds.

Command: /usr/bin/tail
Command Arguments: -1;/tmp/test.txt
Argument Delimiter: ;

.

42484-executeprocess-to-hdfs.png

.

avatar
Contributor

I tried this solution, but it didnt worked for me. But thanks anyway

avatar
Master Guru
@Andy Gisbo

Before splitjson processor use an ExtractText processor and add new property to the processor as

last_line as

^.*\,(\{.*)]$

Example:-

As you are converting csv to json and then using split json processor to split json array into individual messages.

Lets consider your json array is like

[{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}]

you are interested in only

{"id":3,"name":"bar"} //which is last record in the array

Use extract text processor before split json and add new property as last_line

Extract Text processor Configs:-

42485-extract.png

in this processor you can see Max Capture Group Length property which having default value as 1024(i.e 1 kb) //it only captures 1 kb length of message. if your last record (or) message having more than 1 kb size then increase the size as you can see above i have given 4000(i.e almost 4 kb).

Max Buffer Size default value is 1 MB //Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. you need to configure this property as per your flow file size.

Output of this processor will add last_line attribute to the flow file so that you can use that attribute by using ${last_line}

42486-last-line-attr.png

avatar
Contributor

Hi @Shu thx for your help. My next question is how can I get the values from "id" and "name" (in your example) to work with them as attribute, like I did before with EvaluateJsonPath. Is it possible to transform the "last_line" attribute into a flowfile, so I could work with it in EvaluateJsonPath? Or do you have another solution?

avatar
Contributor

My goal is to save these values in cassandra, so i could make a cql statement like: insert into tableA (id, name) values ('${id}', '${name}')

avatar
Master Guru

@Andy Gisbo,

If you want to extract those values then you can do that in 2 ways

Don't want to change the flowfile content method:-

after Extract text processor use

update attribute processor:-

we are going to prepare id,name attributes in this processor

add new properties to the update attribute processor

1. using replaceall function

id as

${last_line:replaceAll('.*"id":(.*)(?:,).*','$1')} //extract value after "id": and replace whole last_line attribute value with the value $1(first capturing group)

name property as

${last_line:replaceAll('.*"name":"(.*\d)(?:").*','$1')} ////extract value after "name": and replace whole last_line attribute value with the value $1(first capturing group)

Update Attribute Configs:-

42492-update.png

2.Using JsonPath function:-

instead of replace all function as last_line attribute having json message so we can use jsonPath function by using nifi expression language

id as

${last_line:jsonPath('$.id')}

name as

${last_line:jsonPath('$.name')}

Configs:-

42493-update.png

Both these ReplaceAll and JsonPath functions will result as same it's better and easy to use JsonPath function instead of replace all.

After this processor we will have id,name attr associated with the flowfile so that you can use them in your insert statement

insert into tableA (id, name) values ('${id}', '${name}')

Flow:-

getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> Extract Text -> UpdateAttribute -> SplitJson -> EvaluateJsonPath ->....->insert statement

Method2:- Changing contents of the flowfile:-

Replace Text Processor instead of Extract Text processor //replace text processor will change the flowfile contents

42489-replace.png

here in this processor we are going to change the whole content of flow file(i.e whole array to last record)

Example:-

 [{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}] //input flowfile content

output:-

{"id":3,"name":"bar"} //output flowfile content

Then use Evaluate json path processor to extract contents of the flowfile

42490-eval-json.png

Your goal is to prepare insert statement so keep Destination as flowfile-attribute in this processor.

Output:-

42491-output.png

Flow:-

getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> ReplaceText --> Evaluatejson -> insert to cassandra.

Both these methods will add id,name attributes to the flowfile

Method1:- we are not going to change the contents of the flowfile

Method2:- we are going to change the contents of the flowfile. //replacing whole content with last record

As you can decide which one is better fits for you use case.

avatar
Contributor

@Shu thank you, I used the 2nd method and it worked like charme 🙂 Your Answers are really nice detailed and easy to follow.

avatar
New Contributor

Did you solve this problem?

I have a very simple way.

You can use JSONPath expression like "$[-1].id" in EvaluateJsonPath.

"-1" means last index of attribute.