Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

avatar
Expert Contributor

Consuming kafka json events and filtering on event type & event name and then finally save them to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

1 ACCEPTED SOLUTION

avatar

I think kafka->Storm->Cassandra is batter as similar architecture worked very well in our use case...

https://www.linkedin.com/pulse/real-time-data-ingestion-easy-simple-co-dev-neeraj-sabharwal

View solution in original post

8 REPLIES 8

avatar

I think kafka->Storm->Cassandra is batter as similar architecture worked very well in our use case...

https://www.linkedin.com/pulse/real-time-data-ingestion-easy-simple-co-dev-neeraj-sabharwal

avatar
Expert Contributor
@Mukesh Kumar

i think you are right but just wondering for only filtering the events dont u think its a overkill to use storm as we have to implement lot of code and maintain a big cluster for storm whereas we can do it quickly in Nifi. What do u say?

avatar
Master Guru

Both approaches could work. This type of filtering task is well suited for NiFi and you could likely use the EvaluateJsonPath and RouteOnAttribute processors to perform the filtering, and the PutCassandraQL processor to insert to Cassandra, and not have to write any code.

avatar
Expert Contributor

@Bryan Bende i was looking to use Nifi as well but not sure if it would work. However i have created the PutCassandraQL processor and in terms of insert query not sure how to pass it as input to the putcassandra processor? My Kafka json event is complex and nested. Currently iam filtering on eventName using EvaluateJsonPath and RouteOnAttribute processors but i need to pass the input to putcassandra processor in a cql insert query format i guess and iam not aware of it. Any suggestions to pass the insert query to putcassandra. thank u

avatar
Master Guru

I personally haven't used the PutCassandraQL processor so @Matt Burgess may know more than me, but I think it expects the content of the FlowFile to contains a CQL statement using ? to escape the parameters.

So lets say your insert statement is, you could use ReplaceText processor and set the Replacement Value property to something like this:

INSERTINTO mytable (field1, field2) VALUES (?, ?)

Before getting to that processor you would need to setup the following FlowFile attributes:

cql.args.1.value = value1
cql.args.1.type = string
cql.args.2.value = value2
cql.args.2.type = string

The value attributes could come from EvaluateJsonPath extracting the appropriate values, and the type attributes could be added by UpdateAttribute.

So your full flow might be:

GetKafka -> EvaluateJsonPath -> RouteOnAttribute -> UpdateAttribute -> ReplaceText -> PutCassandraQL.

There is an example NiFi template for working with Cassandra here:

https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/CassandraProcessors.xml

I have not used it before so I can't say what it demonstrates, but it might be helpful to look at.

avatar
Guru

You could use PutCassandraQL with the insert JSON syntax in CQL. This way all you need to do is GetKafka -> ReplaceText to prepend:

INSERT INTO namespace.table JSON '

and append:

'

This will wrap your JSON document in an INSERT statement if you have a single document per FlowFile. Note that you can use SplitJSON before this if this is not the case. You can then pipe that directly into PutCassandraQL.

Note that if you want to batch up entries inserted into Cassandra, then you can use the MergeContent processors and the Header, Footer and Demarcator properties to achieve a similar transformation.

avatar
Expert Contributor

@Simon Elliston Ball thanks for the reply. Iam trying to filter the kafka json events on eventname and based on the eventaname need to insert the events into cassandra. So i believe i need EvaluateJsonPath -> RouteOnAttribute -> UpdateAttribute for filtering events with event Name? thanks

avatar
Expert Contributor

@Simon Elliston Ball PutCassandraQL doesnt have a in built property to write CQL. So i used one of the example templates from github which uses ExecuteScript processor and the values inserted through executescript processor are hardcoded and i want to use my json values i.e. insert values from my Json event not hardcoded values. Any suggestions plz. https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/CassandraProcessors.xml

Currently iam using this flow to put json in cassandra. GetKafka->EvaluateJson->RouteOnAttribute->ExecuteScript->PutCassandraSQL.