Created on 09-11-2017 07:09 PM - edited 08-17-2019 11:11 AM
This tutorial walks you through a NiFi flow that utilizes the PublishKafkaRecord_0_10 processor to easily convert a CVS file into JSON and then publish to Kafka. The tutorial is based on the blog "Integrating Apache Nifi with Apache Kafka", updated with the more recent record-oriented processors and controller services available in NiFi.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.
This tutorial was tested using the following environment and components:
Here is a template of the flow discussed in this tutorial: publishkafkarecord.xml
The flow in this demo utilizes the PublishKafkaRecord_0_10 processor, which as the name implies, utilizes the Kafka 0.10.x Producer API. As a result, a 0.10.x version of Kafka is required for this tutorial. For my environment, I downloaded and installed Kafka 0.10.2.1 (Scala 2.11).
In the
bin
directory of your Kafka install:
Start ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
Start Kafka:
./kafka-server-start.sh ../config/server.properties
Create Kafka Topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Movies
Start Kafka Consumer:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic Movies --from-beginning
Start NiFi. Import the provided template and add it to the canvas. You should see the following flow on your NiFi canvas:
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
The flow is now ready to run.
Here is a quick overview of the flow:
1. GetHTTP pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website
2. UnpackContent unzips the file
3. RouteOnAttribute sends just the movie title information on in the flow
4. UpdateAttribute adds Schema Name "movies" as an attribute to the flowfile
5. PublishKafkaRecord_0_10:
Let's look at each of the processors in the flow in detail:
Get Movie Data (GetHTTP Processor)
This processor pulls a zip file from the website MovieLens, a movie recommendation service. The dataset (ml-20m.zip) contains 20,000,263 ratings and 465,564 tag applications across 27,278 movies.
Looking at the processor's configuration:
Start the processor to retrieve the file:
Note: On the Scheduling tab, the Runs Schedule is set to 10 minutes instead of the default 0 secs, so the processory only periodically checks to see if the data file has been updated instead of constantly:
Unzip (UnpackContent Processor)
The next processor is UnpackContent which unzips the "ml-20m.zip" file:
Running the processor unzips the file into 7 separate csv files (movies.csv, ratings.csv, tags.csv, links.csv, genome-scores.csv, genome-tags.csv, and README.txt):
RouteOnAttribute Processor
RouteOnAttribute is next. Looking at its configuration:
the processor routes the flowfiles to different connections depending on the file name (movies.csv, ratings.csv, tags.csv).
For the purposes of this demo, we are only interested in publishing the movie title data to Kafka. As such, we make the connection to the next processor (UpdateAttribute) using the "movies" relationship and auto-terminate the others:
Run the RouteOnAttributeProcessor to send only the movie title data:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "movies" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
You can also confirm the contents of the flowfile is in CSV format at this point in the flow:
Publish to "Movies" Topic (PublishKafkaRecord_0_10 Processor)
The final processor is PublishKafkaRecord_0_10. Looking at its configuration:
Kafka Brokers property is set to "localhost:9092" and Topic Name property is set to "Movies". Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service which defines the "movies" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
{ "type": "record", "name": "MoviesRecord", "fields" : [ {"name": "movieId", "type": "long"}, {"name": "title", "type": ["null", "string"]}, {"name": "genres", "type": ["null", "string"]} ] }
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
See JSON in Kafka
Start the PublishKafaRecord processor and you will see the JSON movie data in your Kafka Consumer window:
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Created on 10-26-2017 06:10 PM
Hi,
after this example was published, how can i consume this data from movies topic and insert the data into a cassandra table?
And it is maybe possible to create the needed cassandra table also directly in nifi?
Pls could you give me a solution nearly detailed like this guide, Im absolutly newly trieng nifi.
Created on 03-06-2018 06:10 AM
Hi Thanks for sharing this post.
can you please help to give query to to check wheather incoming data from nifi to kafka ? how can we verify it in kafka ?
And also i am using GetSFTP to get the CSV data from remote machine.
Created on 03-06-2018 03:25 PM
Hi @hema moger
Sorry if I'm misunderstanding your questions, but I think you're asking how to see the results in the last screenshot (20-kafka-consumer.png). The kafka command I used was:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic Movies --from-beginning
Created on 03-07-2018 04:55 AM
instead of zookeeper we can mention bootstrap-server rite? Here your giving example of data from website.
But i have import the data from remote machine (linux) and i have used GetSFTP is dat right option ? anything better than this
please suggest.
Created on 03-07-2018 05:13 AM
and i got below error.
Created on 03-07-2018 05:58 PM
Hi @hema moger
Could you share your entire flow? Curious to see what is before your PublishKafkaRecord processor? Looks like you haven't set your schema.name attribute.
Created on 03-08-2018 04:46 AM
I am sharing the settings. kindly checkstep-1.pnggetsftp-settings.pngpublishkafka.pngcsvreadersettings.pngcsv-writer-settings.png
Created on 03-09-2018 09:14 AM
Created on 03-09-2018 04:23 PM
A couple things:
Created on 03-12-2018 04:17 AM
1)My requirement is We have CSV data in one of the remote machine. every one hour csv data(huge data) is generating in remote machine.
So i used SFTP for that. From there i need to put data to Kafka topics.So i used Publish Kafka record.I have not used Json conversion which is written in above article.But i am going to use it.
2) Yes i got the flow from your article. I have missed schema.name attribute and schema registry bcoz i dont understand what to mention in this.do i need to mention column name which is there in input file.
3) and I am just started learning kafka. and i created partitions in one of topic, i have no idea how to load data to specific partitions using nifi.
If you have any suggestion better than this can you guide me ?