Created 09-08-2016 05:28 AM
I have a requirement to read huge CSV file from Kafka topic to Cassandra. I configuredApache Nifito achieve the same.
Flow:
User does not have a control on Nifi setup. He only specifies the URL where the CSV is located. The web application writes the URL into kafka topic. Nifi fetches the file and inserts into Cassandra.
How will I know that Nifi has inserted all the rows from the CSV file into Cassandra? I need to let the user know that inserting is done and display a page where he can see the unique values from the CSV.
Any help would be appreciated.
Created 09-08-2016 01:19 PM
This is actually a little bit challenging to do right now... I'm assuming that after you get the CSV from Kafka you then used SplitText to split it into individual lines, and then converted each of those to CQL somehow?
There currently isn't a great way to count that all of the flow files from the original CSV have reached a certain point in the flow, but there are a few JIRA tickets open to be able to do something like this. The idea would be have a processor that could act as barrier after the Cassandra processor and could wait for all N flow files before allowing anything to proceed, and at that point you could then send a notification to the user. One of the relevant JIRAs is this one: https://issues.apache.org/jira/browse/NIFI-1926
Created 09-08-2016 01:19 PM
This is actually a little bit challenging to do right now... I'm assuming that after you get the CSV from Kafka you then used SplitText to split it into individual lines, and then converted each of those to CQL somehow?
There currently isn't a great way to count that all of the flow files from the original CSV have reached a certain point in the flow, but there are a few JIRA tickets open to be able to do something like this. The idea would be have a processor that could act as barrier after the Cassandra processor and could wait for all N flow files before allowing anything to proceed, and at that point you could then send a notification to the user. One of the relevant JIRAs is this one: https://issues.apache.org/jira/browse/NIFI-1926
Created 09-09-2016 01:33 AM
Yes I am using PutCassandraQL to write into Cassandra by replacing csv into cql statement. If I can't do it with Nifi, Can I use Spark, Kafka or Storm to implement my requirement?
Created 09-09-2016 12:25 PM
Just to clarify, you used SplitText to split the lines of the CSV right?
If that is true, then one thing you can try in NiFi is to send the success relationship of PutCassandraQL to a MergeContent processor, and set MergeContent's "Merge Strategy" to "Defragment". Defragment mode merges together all flow files that have the same value for an attribute called "fragment.identifier". The SplitText processor writes three attributes on all the child flow files it creates - "fragment.identifier", "fragment.count", and "fragment.index", so your flow files would be able to be defragmented by MergeContent.
Once MergeContent has defragmented them then you can do whatever you want to notify the user, maybe use PutEmail processor.
Created 09-12-2016 11:58 PM
Yes. That is exactly what I wanted. Thank you!