Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Objective

This tutorial demonstrates how to use the PutElasticsearchHttpRecord processor to easily put data from a CSV file into Elasticsearch.

Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.3.0
  • Elasticsearch 2.3.3

PublishElasticsearchHttpRecord (CSVReader)

Demo Configuration

Elasticsearch

For my environment, I had Elasticsearch 2.3.3 installed.

Start Elasticsearch and assign cluster and node names: ./elasticsearch --cluster.name elasticsearch --node.name hcc

I like to use Elastic HQ to manage/monitor my cluster:

39381-1-elastic-hq.png

Initial Flow

One of the great things about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.

For example, let's assume you have the flow working from the article "Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)".

Note: The template for that flow can be found in that article as well as step-by-step instructions on how to configure it.

39382-2-publishkafkarecord-flow.png

As currently configured, the flow:

1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.

2. Unzips the file.

3. Sends only the movie title information on in the flow.

4. Adds Schema Name "movies" as an attribute to the flowfile.

5. Uses PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.

Say instead of publishing that movie data to Kafka, you now want to put it in Elasticsearch. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutElasticsearchRecord processor and re-using a CSVReader.

Elasticsearch Flow Setup

1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.

2. Add a PutElasticsearchHttpRecord to the canvas.

3. Connect the UpdateAttribute processor to the PutElasticsearchHttpRecord processor:

39384-3-putelasticsearch-canvas.png

4. Open the Configure dialog for the PutElasticsearchHttpRecord process. On the Settings tab, auto-terminate the "success" relationship and for the purposes of this demo, auto-terminate the "failure" relationship also.

5. On the canvas, make a "retry" relationship connection from the PutElasticsearchHttpRecord to itself.

6. On the Properties tab:

  • Add "http://127.0.0.1:9200" for the Elasticsearch URL property
  • Add "movies" for the Index property
  • Add "default" for the Type property
  • Since it and its schema was already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.

39385-4-putelasticsearchhttprecord-configuration.png

The flow is ready to run.

39386-5-putelasticsearchhttprecord-flow-complete.png

Flow Results

Start the flow.

(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)

39387-6-flow-running.png

The movie data is now in Elasticsearch:

39388-7-elasticsearch-cluster-overview.png

39389-8-movie-index-query.png

Helpful Links

Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:

6,182 Views