Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Objective

This tutorial walks you through a NiFI flow that utilizes the ConvertRecord processor and Record Reader/Writer controller services to easily convert a CVS file into JSON format. Additionally, the flow is modified to also convert the CSV file to Avro and XML formats.

Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.

Environment

This tutorial was tested using the following environment and components:

  • Mac OS X 10.11.6
  • Apache NiFi 1.3.0

Convert CSV to JSON

Support Files

Here is a template of the flow discussed in this tutorial: convert-csv-to-json.xml

Here is the CSV file used in the flow: users.txt (Change the extension from .txt to .csv after downloading)

Note: The CSV data originated from the site, RandomUser. This useful site provides a free API to pull down randomly generated user data. For example: https://randomuser.me/api/0.6/?results=10&format=SQL.

Demo Configuration

Input and Output

Create two local directories. One input directory and one for the JSON output. Place the "users.csv" file in the input directory.

21468-1-in-out.png

Import Template

Start NiFi. Import the provided template and add it to the canvas.

You should see the following flow on your NiFi canvas:

21470-2-template-added.png

Enable Controller Services

Select the gear icon from the Operate Palette:

21471-3-flow-configuration.png

This opens the NiFi Flow Configuration window. Select the Controller Services tab:

21474-4-controller-services-disabled.png

Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:

21475-5-controller-services-enabled.png

Update Directory Path in GetFile Processor

Change the Input Directory path in the GetFile processor to point to your local input directory:

21476-6-getfile-change-directory.png

Update Directory Path in PutFile Processor

Change the Directory path in the PutFile processor to point to your local output directory:

21477-7-putfile-directory.png

The flow is now ready to run.

Flow Overview

Here is a quick overview of the flow:

1. GetFile ingests a CSV file of user data from a local directory

2. UpdateAttribute adds Schema Name "users" as an attribute to the flowfile

3. ConvertRecord converts the flowfile contents from CSV to JSON by:

  • Using a CSVReader controller service that references a schema in an AvroSchemaRegistry controller service
  • The AvroSchemaRegistry contains a "users" schema which defines information about each record (field names, field ids, field types)
  • Using a JsonRecordSetWriter controller service that references the same AvroSchemaRegistry schema

4. UpdateAttribute adds the file name with the JSON extension as an attribute to the flowfile

5. PutFile writes the contents of the flowfile to a local directory

Note: Currently, there are two Schema Registry implementations: the local Avro-based Schema Registry controller service utilized in this demo flow and an external client, the Hortonworks Schema Registry.

Flow Details

Let's look at each of the processors in the flow in detail:

Get CSV File (GetFile Processor)

FlowFiles are generated from the users.csv file in the local directory. All of the properties are set to default values, except for Input Directory, which we edited earlier.

Start the processor:

21478-8-getcsvfile-start.png

One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):

21479-9-csvflowfile-viewdetails.png

From the FlowFile window that opens, select the "View" button from the Details tab:

21480-10-csvflowfile-view.png

to view the CSV contents of the flowfile:

21482-11-csvflowfile-contents.png

Add Schema Name Attribute (UpdateAttribute Processor)

The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "users" to the flowfile:

21483-12-updateattribute-schemaname-users.png

Start the processor, and view the attributes of the flowfile to confirm this:

21484-13-updateattribute-schemaname-attributes.png

ConvertRecord - CSVtoJSON (ConvertRecord Processor)

The next processor is ConvertRecord. Looking at its configuration, there are only two properties:

21485-14-convertrecord-properties.png

Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.

CSVReader Controller Service

Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:

21486-15-csvreader-properties.png

With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service which defines the "users" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:

21487-16-avroschemaregistry-properties.png

JsonRecordSetWriter Controller Service

Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:

21488-17-jsonrecordsetwriter-properties.png

Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.

Start the ConvertRecord processor. The contents of the flowfile are now JSON:

21489-18-jsonflowfile-contents.png

Add JSON File Name Extension (UpdateAttribute Processor)

The next processor is another UpdateAttribute, which simply adds a JSON extension to the name of the original CSV file:

21490-19-updateattribute-jsonext-properties.png

Start the processor, and view the details of the flowfile to confirm this:

21491-20-updateattribute-filename-attributes.png

PutFile Processor

The last processor places the JSON format file in a local directory as described earlier in the configuration section of this article.

Start the processor and confirm the file has been saved:

21493-22-putfile-json-saved.png

Convert CSV to Avro & XML

Now that you have a flow that converts CSV files to JSON using the record-oriented processors and controller services, it is very easy to modify the ConvertRecord processor to convert the CSV to other data formats.

CSV to Avro: AvroRecordSetWriter

The ConvertRecord processor will continue to use the CSVReader, but now an AvroRecordSetWriter will be used for the Record Writer:

21496-23-convertrecord-csvtoavro.png

Setting up this controller service is very similar to the JSONRecordSetWriter. Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry:

21498-24-avrorecordsetwriter-properties.png

Running the processor now produces the flowfile with Avro contents:

21499-25-avroflowfile-contents.png

CSV to XML: ScriptedRecordSetWriter

Again, the ConvertRecord processor will continue to use the CSVReader, but now a ScriptedRecordSetWriter will be used for the Record Writer:

21500-26-convertrecord-csvtoxml.png

Looking at the properties for this controller service:

21501-27-scriptedrecordsetwriter-properties.png

The Script Engine is Groovy. The Script Body is:

import groovy.xml.MarkupBuilder
import java.io.IOException
import java.io.InputStream
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter {
    private int recordCount = 0;
    private final OutputStream out;
    public GroovyRecordSetWriter(final OutputStream out) {
        this.out = out;
    }
    @Override
    WriteResult write(Record r) throws IOException {
        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
            new MarkupBuilder(osw).record {
                r.schema.fieldNames.each {fieldName ->
                    "$fieldName" r.getValue(fieldName)
                }
            }
        }
        recordCount++;
        WriteResult.of(1, [:])
    }
    @Override
    String getMimeType() {
        return 'application/xml'
    }
    @Override
    WriteResult write(final RecordSet rs) throws IOException {
        int count = 0
        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
            new MarkupBuilder(osw).recordSet {
                Record r
                while (r = rs.next()) {
                    count++
                    record {
                        rs.schema.fieldNames.each {fieldName ->
                            "$fieldName" r.getValue(fieldName)
                        }
                    }
                }
            }
        }
        WriteResult.of(count, [:])
    }
    public void beginRecordSet() throws IOException {
    }
    @Override
    public WriteResult finishRecordSet() throws IOException {
        return WriteResult.of(recordCount, [:]);
    }
    @Override
    public void close() throws IOException {
    }
    @Override
    public void flush() throws IOException {
    }
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
    @Override
    public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) throws SchemaNotFoundException, IOException {
        return null;
    }
    @Override
    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
        return new GroovyRecordSetWriter(out)
    }
}
writer = new GroovyRecordSetWriterFactory()

Note: The script body was taken from https://github.com/apache/nifi/blob/rel/nifi-1.3.0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scri...

Running the processor now produces the flowfile with XML contents:

21502-28-xmlflowfile-contents.png

Here is a template that converts the CSV file to JSON, Avro and XML: convert-csv-to-json-avro-xml.xml

Note: After importing this template, make sure the directory paths for the GetFile and PutFile processors exist, confirm users.csv is in the input directory and remember to enable all Controller Services before running the flow.

Helpful Links

Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:

62,319 Views
Comments

Hi @Andrew Lim

I am having problem in Nifi data flow. How to resolve this pls guide me .

I have CSV file which is having one column in json format like this (Input)

Name : Surendra

Age : 25

Address :{city:Chennai,state:TN,zipcode:600343}

Now I am required output like this

Name and Age remains same but address fields should be like this

Address_city : Chennai

Address_state : TN

Address_zipcode : 600343

Pls any guide, how to solve this using Nifi.

Hi @Surendra Shringi,

Probably best to post a new question on HCC for your issue. I'm not sure how to address it and the entire HCC community would be aware of your problem to help out.

If you are using Apache NiFi 1.5.0, the script body used for the CSV to XML conversion needs to be updated/corrected with the following:

import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerServiceimport org.apache.nifi.flowfile.FlowFileimport org.apache.nifi.logging.ComponentLogimport org.apache.nifi.schema.access.SchemaNotFoundExceptionimport org.apache.nifi.serialization.RecordSetWriterimport org.apache.nifi.serialization.RecordSetWriterFactoryimport org.apache.nifi.serialization.WriteResultimport org.apache.nifi.serialization.record.Recordimport org.apache.nifi.serialization.record.RecordSchemaimport org.apache.nifi.serialization.record.RecordSetimport org.apache.nifi.stream.io.NonCloseableOutputStream

class GroovyRecordSetWriter implements RecordSetWriter {    private int recordCount = 0;    private final OutputStream out;        public GroovyRecordSetWriter(final OutputStream out) {        this.out = out;    }        @Override    WriteResult write(Record r) throws IOException {        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->            new MarkupBuilder(osw).record {                r.schema.fieldNames.each {fieldName ->                    "$fieldName" r.getValue(fieldName)                }            }        }                recordCount++;        WriteResult.of(1, [:])    }
    @Override    String getMimeType() {        return 'application/xml'    }
    @Override    WriteResult write(final RecordSet rs) throws IOException {        int count = 0
        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->            new MarkupBuilder(osw).recordSet {
                Record r                while (r = rs.next()) {                    count++
                    record {                        rs.schema.fieldNames.each {fieldName ->                            "$fieldName" r.getValue(fieldName)                        }                    }                }            }        }        WriteResult.of(count, [:])    }        public void beginRecordSet() throws IOException {    }        @Override    public WriteResult finishRecordSet() throws IOException {        return WriteResult.of(recordCount, [:]);    }        @Override    public void close() throws IOException {    }        @Override    public void flush() throws IOException {    }}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
    @Override    RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {        return null    }
    @Override    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {        return new GroovyRecordSetWriter(out)    }    }
writer = new GroovyRecordSetWriterFactory()

NiFi 1.7.0 introduced XML Record Reader/Writers. So CSV to XML conversion using a ConvertRecord processor can now be done simply with CSVReader and XMLRecordSetWriter controller services as shown here:

https://community.hortonworks.com/content/kbentry/199310/xml-record-writer-in-apache-nifi-170.html

Hi @Andrew Lim , Thanks for a detailed explaination. Following your article i am trying to convert a csv to json using convertrecord processor and then load the merged json (output of convertrecord) to redshift using copy from a file.my merged json is stored in s3.I am getting error that csv is not in json format, could you please suggest how to load these records all at once to redshift?