Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Filter kafka Json events based on eventname and insert into cassandra using Nifi

avatar
Expert Contributor

Looking for suggestions to insert json into cassandra. I need to filter the json event from kafka topic based on EventName and need to pipe the whole json file lets say if i have an eventname="LoginSucceeded" then i need to pipe it to next processor the whole file not just the single property of the whole json file. I looked at EvaluateJson, RouteOnAttribute & AttributesToJson but they are piping out only subset of properties not the whole json event when there is a match to eventName.

Also i want to know If i use RouteOnAttribute then how to pass some subset of json properties to PutCassandraQL. I used below example from github https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/CassandraProcessors.xml

which uses ExecuteScript to insert hardcoded values into cassandra but im looking to insert values from my json event. In

PutCassandraQL doesnt have a in built property to write CQL. So i used one of the example templates from github which uses ExecuteScript processor and the values inserted through executescript processor are hardcoded and i want to use my json values i.e. insert values from my Json event not hardcoded values. Any suggestions plz.

Currently iam using this flow to put json in cassandra. GetKafka->EvaluateJson->RouteOnAttribute->ExecuteScript->PutCassandraSQL.

This flow is broken as iam stuck on how to pipe my json properties from RouteOnAttribute to ExecuteScript?

Thank you

1 ACCEPTED SOLUTION

avatar
Master Guru

In EvaluateJsonPath, you can choose "flowfile-attribute" as the Destination, then the original JSON will still be in the flow file content, and any extracted JSON elements will be in the flowfile's attributes. That can go into RouteOnAttribute for "eventname". Then you can use ReplaceText (or ExecuteScript if you prefer) to create a CQL statement using Expression Language to insert the values from your attributes, or to wrap the entire JSON object in a CQL statement.

I have a template that uses ReplaceText to put an entire JSON object into a "INSERT INTO myTable JSON" CQL statement, it is available as a Gist (here). It doesn't have a PutCassandraQL processor at the end, instead its a LogAttribute processor so you can see if the CQL looks right for what you're trying to do.

View solution in original post

1 REPLY 1

avatar
Master Guru

In EvaluateJsonPath, you can choose "flowfile-attribute" as the Destination, then the original JSON will still be in the flow file content, and any extracted JSON elements will be in the flowfile's attributes. That can go into RouteOnAttribute for "eventname". Then you can use ReplaceText (or ExecuteScript if you prefer) to create a CQL statement using Expression Language to insert the values from your attributes, or to wrap the entire JSON object in a CQL statement.

I have a template that uses ReplaceText to put an entire JSON object into a "INSERT INTO myTable JSON" CQL statement, it is available as a Gist (here). It doesn't have a PutCassandraQL processor at the end, instead its a LogAttribute processor so you can see if the CQL looks right for what you're trying to do.