Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Guru

Objective

This tutorial walks you through a NiFi flow that utilizes the PublishKafkaRecord_0_10 processor to easily convert a CVS file into JSON and then publish to Kafka. The tutorial is based on the blog "Integrating Apache Nifi with Apache Kafka", updated with the more recent record-oriented processors and controller services available in NiFi.

Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.3.0
  • Apache Kafka 0.10.2.1

PublishKafkaRecord_0_10 (CSV to JSON)

Support Files

Here is a template of the flow discussed in this tutorial: publishkafkarecord.xml

Demo Configuration

Kafka Download & Install

The flow in this demo utilizes the PublishKafkaRecord_0_10 processor, which as the name implies, utilizes the Kafka 0.10.x Producer API. As a result, a 0.10.x version of Kafka is required for this tutorial. For my environment, I downloaded and installed Kafka 0.10.2.1 (Scala 2.11).

Kafka Configuration & Startup

In the bin directory of your Kafka install:

Start ZooKeeper: ./zookeeper-server-start.sh ../config/zookeeper.properties

Start Kafka: ./kafka-server-start.sh ../config/server.properties

Create Kafka Topic: ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Movies

Start Kafka Consumer: ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic Movies --from-beginning

Import Template

Start NiFi. Import the provided template and add it to the canvas. You should see the following flow on your NiFi canvas:

34738-1-template-import.png

Enable Controller Services

Select the gear icon from the Operate Palette:

34739-2-flow-configuration.png

This opens the NiFi Flow Configuration window. Select the Controller Services tab:

34740-3-disabled-controller-services.png

Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:

34741-4-enabled-controller-services.png

The flow is now ready to run.

Flow Overview

Here is a quick overview of the flow:

1. GetHTTP pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website

2. UnpackContent unzips the file

3. RouteOnAttribute sends just the movie title information on in the flow

4. UpdateAttribute adds Schema Name "movies" as an attribute to the flowfile

5. PublishKafkaRecord_0_10:

  • Converts the flowfile contents from CSV to JSON
  • Publishes the JSON data to the Kafka topic "Movies"

Flow Details

Let's look at each of the processors in the flow in detail:

Get Movie Data (GetHTTP Processor)

This processor pulls a zip file from the website MovieLens, a movie recommendation service. The dataset (ml-20m.zip) contains 20,000,263 ratings and 465,564 tag applications across 27,278 movies.

Looking at the processor's configuration:

34742-5-gethttp-configuration.png

Start the processor to retrieve the file:

34743-6-gethttp-start.png

Note: On the Scheduling tab, the Runs Schedule is set to 10 minutes instead of the default 0 secs, so the processory only periodically checks to see if the data file has been updated instead of constantly:

34744-7-gethttp-scheduling.png

Unzip (UnpackContent Processor)

The next processor is UnpackContent which unzips the "ml-20m.zip" file:

34745-8-unpackcontent-configuration.png

Running the processor unzips the file into 7 separate csv files (movies.csv, ratings.csv, tags.csv, links.csv, genome-scores.csv, genome-tags.csv, and README.txt):

34746-9-unpackcontent-start.png

RouteOnAttribute Processor

RouteOnAttribute is next. Looking at its configuration:

34747-10-routeonattribute-configuration.png

the processor routes the flowfiles to different connections depending on the file name (movies.csv, ratings.csv, tags.csv).

For the purposes of this demo, we are only interested in publishing the movie title data to Kafka. As such, we make the connection to the next processor (UpdateAttribute) using the "movies" relationship and auto-terminate the others:

34748-11-routeonattribute-settings.png

Run the RouteOnAttributeProcessor to send only the movie title data:

34749-12-routeonattribute-start.png

Add Schema Name Attribute (UpdateAttribute Processor)

The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "movies" to the flowfile:

34750-13-updateattribute-configuration.png

Start the processor, and view the attributes of the flowfile to confirm this:

34751-14-updateattribute-flowfile-attributes.png

You can also confirm the contents of the flowfile is in CSV format at this point in the flow:

34752-15-updateattribute-flowfile-contents.png

Publish to "Movies" Topic (PublishKafkaRecord_0_10 Processor)

The final processor is PublishKafkaRecord_0_10. Looking at its configuration:

34753-16-publishkafkarecord-configuration.png

Kafka Brokers property is set to "localhost:9092" and Topic Name property is set to "Movies". Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.

CSVReader Controller Service

Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:

34754-17-csvreader-properties.png

With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service which defines the "movies" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:

34755-18-avroschemaregistry-properties.png

The schema is defined as:

{
  "type": "record",
  "name": "MoviesRecord",
  "fields" : [
   {"name": "movieId", "type": "long"},
   {"name": "title", "type": ["null", "string"]},
   {"name": "genres", "type": ["null", "string"]}
  ]
}

JsonRecordSetWriter Controller Service

Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:

34756-19-jsonrecordsetwriter-properties.png

Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.

See JSON in Kafka

Start the PublishKafaRecord processor and you will see the JSON movie data in your Kafka Consumer window:

34757-20-kafka-consumer.png

Helpful Links

Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:

12,749 Views
Comments

Hi,

after this example was published, how can i consume this data from movies topic and insert the data into a cassandra table?

And it is maybe possible to create the needed cassandra table also directly in nifi?

Pls could you give me a solution nearly detailed like this guide, Im absolutly newly trieng nifi.

@Andrew Lim

Hi Thanks for sharing this post.

can you please help to give query to to check wheather incoming data from nifi to kafka ? how can we verify it in kafka ?

And also i am using GetSFTP to get the CSV data from remote machine.

Hi @hema moger

Sorry if I'm misunderstanding your questions, but I think you're asking how to see the results in the last screenshot (20-kafka-consumer.png). The kafka command I used was:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic Movies --from-beginning

@Andrew Lim

instead of zookeeper we can mention bootstrap-server rite? Here your giving example of data from website.

But i have import the data from remote machine (linux) and i have used GetSFTP is dat right option ? anything better than this

please suggest.

Hi @hema moger

Could you share your entire flow? Curious to see what is before your PublishKafkaRecord processor? Looks like you haven't set your schema.name attribute.

@Andrew Lim:a

Can you help on this ?

@hema moger

A couple things:

  • What is your intent in using the PublishKafkaRecord processor? You set yours up to use both a CSVReader and a CSVRecordSetWriter. So there is no format conversion like my flow which converts CSV to JSON.
  • Did you get the flow from my article running? It seems like you are missing critical components. As I mentioned earlier, you aren't setting a schema.name attribute which is needed by your CSVRecordSetWriter.

@Andrew Lim

1)My requirement is We have CSV data in one of the remote machine. every one hour csv data(huge data) is generating in remote machine.

So i used SFTP for that. From there i need to put data to Kafka topics.So i used Publish Kafka record.I have not used Json conversion which is written in above article.But i am going to use it.

2) Yes i got the flow from your article. I have missed schema.name attribute and schema registry bcoz i dont understand what to mention in this.do i need to mention column name which is there in input file.

3) and I am just started learning kafka. and i created partitions in one of topic, i have no idea how to load data to specific partitions using nifi.

If you have any suggestion better than this can you guide me ?