Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Solved Go to solution

Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Contributor

Consuming kafka json events and filtering on event type & event name and then finally save them to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

I think kafka->Storm->Cassandra is batter as similar architecture worked very well in our use case...

https://www.linkedin.com/pulse/real-time-data-ingestion-easy-simple-co-dev-neeraj-sabharwal

View solution in original post

8 REPLIES 8
Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

I think kafka->Storm->Cassandra is batter as similar architecture worked very well in our use case...

https://www.linkedin.com/pulse/real-time-data-ingestion-easy-simple-co-dev-neeraj-sabharwal

View solution in original post

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Contributor
@Mukesh Kumar

i think you are right but just wondering for only filtering the events dont u think its a overkill to use storm as we have to implement lot of code and maintain a big cluster for storm whereas we can do it quickly in Nifi. What do u say?

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Both approaches could work. This type of filtering task is well suited for NiFi and you could likely use the EvaluateJsonPath and RouteOnAttribute processors to perform the filtering, and the PutCassandraQL processor to insert to Cassandra, and not have to write any code.

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Contributor

@Bryan Bende i was looking to use Nifi as well but not sure if it would work. However i have created the PutCassandraQL processor and in terms of insert query not sure how to pass it as input to the putcassandra processor? My Kafka json event is complex and nested. Currently iam filtering on eventName using EvaluateJsonPath and RouteOnAttribute processors but i need to pass the input to putcassandra processor in a cql insert query format i guess and iam not aware of it. Any suggestions to pass the insert query to putcassandra. thank u

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

I personally haven't used the PutCassandraQL processor so @Matt Burgess may know more than me, but I think it expects the content of the FlowFile to contains a CQL statement using ? to escape the parameters.

So lets say your insert statement is, you could use ReplaceText processor and set the Replacement Value property to something like this:

INSERTINTO mytable (field1, field2) VALUES (?, ?)

Before getting to that processor you would need to setup the following FlowFile attributes:

cql.args.1.value = value1
cql.args.1.type = string
cql.args.2.value = value2
cql.args.2.type = string

The value attributes could come from EvaluateJsonPath extracting the appropriate values, and the type attributes could be added by UpdateAttribute.

So your full flow might be:

GetKafka -> EvaluateJsonPath -> RouteOnAttribute -> UpdateAttribute -> ReplaceText -> PutCassandraQL.

There is an example NiFi template for working with Cassandra here:

https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/CassandraProcessors.xml

I have not used it before so I can't say what it demonstrates, but it might be helpful to look at.

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Guru

You could use PutCassandraQL with the insert JSON syntax in CQL. This way all you need to do is GetKafka -> ReplaceText to prepend:

INSERT INTO namespace.table JSON '

and append:

'

This will wrap your JSON document in an INSERT statement if you have a single document per FlowFile. Note that you can use SplitJSON before this if this is not the case. You can then pipe that directly into PutCassandraQL.

Note that if you want to batch up entries inserted into Cassandra, then you can use the MergeContent processors and the Header, Footer and Demarcator properties to achieve a similar transformation.

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Contributor

@Simon Elliston Ball thanks for the reply. Iam trying to filter the kafka json events on eventname and based on the eventaname need to insert the events into cassandra. So i believe i need EvaluateJsonPath -> RouteOnAttribute -> UpdateAttribute for filtering events with event Name? thanks

Highlighted

Re: Kafka json events filter on event name & event type and save to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?

Contributor

@Simon Elliston Ball PutCassandraQL doesnt have a in built property to write CQL. So i used one of the example templates from github which uses ExecuteScript processor and the values inserted through executescript processor are hardcoded and i want to use my json values i.e. insert values from my Json event not hardcoded values. Any suggestions plz. https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/CassandraProcessors.xml

Currently iam using this flow to put json in cassandra. GetKafka->EvaluateJson->RouteOnAttribute->ExecuteScript->PutCassandraSQL.

Don't have an account?
Coming from Hortonworks? Activate your account here