Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Objective

This tutorial demonstrates how to use the PutMongoRecord processor to easily put data from a CSV file into MongoDB.

Note: The PutMongoRecord processor was introduced in NiFi 1.4.0. As such, the tutorial needs to be done running Version 1.4.0.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.4.0
  • MongoDB 3.4.9

PutMongoRecord (CSVReader)

Demo Configuration

MongoDB

For my environment, I had a local MongoDB 3.4.9 instance installed.

Start MongoDB and create a database "hcc" and a collection "movies" for use with this tutorial:

>use hcc

switched to db hcc

> db.createCollection("movies")

{ "ok" : 1 }

> show collections

movies

I like to use Robot 3T to manage/monitor my MongoDB instance:

40831-1-robo3t.png

Initial Flow

A powerful feature about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.

For example, let's assume you have the flow working from the article "Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)" with all of the necesary controller services enabled.

40832-2-publishkafkarecord-flow.png

(Note: The template for this flow can be found in the article as well as step-by-step instructions on how to configure it.)

As currently configured, the flow:

1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.

2. Unzips the file.

3. Sends only the movie title information on in the flow.

4. Adds Schema Name "movies" as an attribute to the flowfile.

5. Uses PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.

Instead of publishing that movie data to Kafka, we now want to put it in MongoDB. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutMongoRecord processor and re-using the CSVReader that references an Avro Schema Registry where the movies schema is defined.

PutMongoRecord Flow Setup

1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.

2. Add a PutMongoRecord to the canvas.

3. Connect the UpdateAttribute processor to the PutMongoRecord processor:

40833-3-putmongorecord-canvas.png

4. Open the Configure dialog for the PutMongoRecord process. On the Settings tab, auto-terminate the "success" relationship.

5. On the canvas, make a "failure" relationship connection from the PutMongoRecord to itself.

6. On the Properties tab:

  • Add "mongodb://localhost:27017" for the Mongo URI property
  • Add "hcc" for the Mongo Database Name property
  • Add "movies" for the Mongo Collection Name property
  • Since it and related schema were already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.

40834-4-putmongorecord-configuration.png

Select "Apply".

The flow is ready to run.

40835-5-putrecordmongo-flow-complete.png

Flow Results

Start the flow.

(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)

40836-6-flow-running.png

The movie data is now in MongoDB:

40838-7-mongodb-documents.png

Helpful Links

Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:

8,031 Views
Comments
avatar
New Contributor

Hi Andrew, I have used PutMongoRecord for nested JSON object, the moment my JSON has nested structure it fails with an error org.bson.codecs.configuration.CodecConfigurationException and the insertion to mongodb fails. Please find the AvroSchema in the attached image. I could get the JSON validated against the schema correctly.

56517-screen-shot-2018-01-30-at-001446.png

Attached the stack trace for the error as well.

56518-screen-shot-2018-01-30-at-001745.png

Am i missing anything on the AvroSchema Registry config or any other config