Created 07-05-2016 02:12 PM
Consuming kafka json events and filtering on event type & event name and then finally save them to cassandra. Which is better kafka->Storm->Cassandra or kafka->Nifi->Cassandra?
i think you are right but just wondering for only filtering the events dont u think its a overkill to use storm as we have to implement lot of code and maintain a big cluster for storm whereas we can do it quickly in Nifi. What do u say?
Both approaches could work. This type of filtering task is well suited for NiFi and you could likely use the EvaluateJsonPath and RouteOnAttribute processors to perform the filtering, and the PutCassandraQL processor to insert to Cassandra, and not have to write any code.
@Bryan Bende i was looking to use Nifi as well but not sure if it would work. However i have created the PutCassandraQL processor and in terms of insert query not sure how to pass it as input to the putcassandra processor? My Kafka json event is complex and nested. Currently iam filtering on eventName using EvaluateJsonPath and RouteOnAttribute processors but i need to pass the input to putcassandra processor in a cql insert query format i guess and iam not aware of it. Any suggestions to pass the insert query to putcassandra. thank u
I personally haven't used the PutCassandraQL processor so @Matt Burgess may know more than me, but I think it expects the content of the FlowFile to contains a CQL statement using ? to escape the parameters.
So lets say your insert statement is, you could use ReplaceText processor and set the Replacement Value property to something like this:
INSERTINTO mytable (field1, field2) VALUES (?, ?)
Before getting to that processor you would need to setup the following FlowFile attributes:
cql.args.1.value = value1 cql.args.1.type = string cql.args.2.value = value2 cql.args.2.type = string
The value attributes could come from EvaluateJsonPath extracting the appropriate values, and the type attributes could be added by UpdateAttribute.
So your full flow might be:
GetKafka -> EvaluateJsonPath -> RouteOnAttribute -> UpdateAttribute -> ReplaceText -> PutCassandraQL.
There is an example NiFi template for working with Cassandra here:
I have not used it before so I can't say what it demonstrates, but it might be helpful to look at.
You could use PutCassandraQL with the insert JSON syntax in CQL. This way all you need to do is GetKafka -> ReplaceText to prepend:
INSERT INTO namespace.table JSON '
This will wrap your JSON document in an INSERT statement if you have a single document per FlowFile. Note that you can use SplitJSON before this if this is not the case. You can then pipe that directly into PutCassandraQL.
Note that if you want to batch up entries inserted into Cassandra, then you can use the MergeContent processors and the Header, Footer and Demarcator properties to achieve a similar transformation.
@Simon Elliston Ball thanks for the reply. Iam trying to filter the kafka json events on eventname and based on the eventaname need to insert the events into cassandra. So i believe i need EvaluateJsonPath -> RouteOnAttribute -> UpdateAttribute for filtering events with event Name? thanks
@Simon Elliston Ball PutCassandraQL doesnt have a in built property to write CQL. So i used one of the example templates from github which uses ExecuteScript processor and the values inserted through executescript processor are hardcoded and i want to use my json values i.e. insert values from my Json event not hardcoded values. Any suggestions plz. https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/CassandraProcessors.xml
Currently iam using this flow to put json in cassandra. GetKafka->EvaluateJson->RouteOnAttribute->ExecuteScript->PutCassandraSQL.