Created on 07-31-2017 05:58 PM - edited 08-17-2019 11:47 AM
This tutorial walks you through a NiFI flow that utilizes the ConvertRecord processor and Record Reader/Writer controller services to easily convert a CVS file into JSON format. Additionally, the flow is modified to also convert the CSV file to Avro and XML formats.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later.
This tutorial was tested using the following environment and components:
Here is a template of the flow discussed in this tutorial: convert-csv-to-json.xml
Here is the CSV file used in the flow: users.txt (Change the extension from .txt to .csv after downloading)
Note: The CSV data originated from the site, RandomUser. This useful site provides a free API to pull down randomly generated user data. For example: https://randomuser.me/api/0.6/?results=10&format=SQL.
Create two local directories. One input directory and one for the JSON output. Place the "users.csv" file in the input directory.
Start NiFi. Import the provided template and add it to the canvas.
You should see the following flow on your NiFi canvas:
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
Change the Input Directory path in the GetFile processor to point to your local input directory:
Change the Directory path in the PutFile processor to point to your local output directory:
The flow is now ready to run.
Here is a quick overview of the flow:
1. GetFile ingests a CSV file of user data from a local directory
2. UpdateAttribute adds Schema Name "users" as an attribute to the flowfile
3. ConvertRecord converts the flowfile contents from CSV to JSON by:
4. UpdateAttribute adds the file name with the JSON extension as an attribute to the flowfile
5. PutFile writes the contents of the flowfile to a local directory
Note: Currently, there are two Schema Registry implementations: the local Avro-based Schema Registry controller service utilized in this demo flow and an external client, the Hortonworks Schema Registry.
Let's look at each of the processors in the flow in detail:
Get CSV File (GetFile Processor)
FlowFiles are generated from the users.csv file in the local directory. All of the properties are set to default values, except for Input Directory, which we edited earlier.
Start the processor:
One flowfile is generated with the CSV data as the contents. Right click on the connection between the GetFile Processor and the UpdateAttribute Processor. In the context menu, select "List Queue" and click the View Details button ("i" icon):
From the FlowFile window that opens, select the "View" button from the Details tab:
to view the CSV contents of the flowfile:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the schema.name attribute with the value of "users" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
ConvertRecord - CSVtoJSON (ConvertRecord Processor)
The next processor is ConvertRecord. Looking at its configuration, there are only two properties:
Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is schema.name. The Schema Registry property is set to the AvroSchemaRegistry Controller Service which defines the "users" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
Start the ConvertRecord processor. The contents of the flowfile are now JSON:
Add JSON File Name Extension (UpdateAttribute Processor)
The next processor is another UpdateAttribute, which simply adds a JSON extension to the name of the original CSV file:
Start the processor, and view the details of the flowfile to confirm this:
PutFile Processor
The last processor places the JSON format file in a local directory as described earlier in the configuration section of this article.
Start the processor and confirm the file has been saved:
Now that you have a flow that converts CSV files to JSON using the record-oriented processors and controller services, it is very easy to modify the ConvertRecord processor to convert the CSV to other data formats.
The ConvertRecord processor will continue to use the CSVReader, but now an AvroRecordSetWriter will be used for the Record Writer:
Setting up this controller service is very similar to the JSONRecordSetWriter. Schema Write Strategy is set to "Set 'schema.name' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry:
Running the processor now produces the flowfile with Avro contents:
Again, the ConvertRecord processor will continue to use the CSVReader, but now a ScriptedRecordSetWriter will be used for the Record Writer:
Looking at the properties for this controller service:
The Script Engine is Groovy. The Script Body is:
import groovy.xml.MarkupBuilder import java.io.IOException import java.io.InputStream import org.apache.nifi.controller.AbstractControllerService import org.apache.nifi.flowfile.FlowFile import org.apache.nifi.logging.ComponentLog import org.apache.nifi.schema.access.SchemaNotFoundException import org.apache.nifi.serialization.RecordSetWriter import org.apache.nifi.serialization.RecordSetWriterFactory import org.apache.nifi.serialization.WriteResult import org.apache.nifi.serialization.record.Record import org.apache.nifi.serialization.record.RecordSchema import org.apache.nifi.serialization.record.RecordSet import org.apache.nifi.stream.io.NonCloseableOutputStream class GroovyRecordSetWriter implements RecordSetWriter { private int recordCount = 0; private final OutputStream out; public GroovyRecordSetWriter(final OutputStream out) { this.out = out; } @Override WriteResult write(Record r) throws IOException { new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).record { r.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } recordCount++; WriteResult.of(1, [:]) } @Override String getMimeType() { return 'application/xml' } @Override WriteResult write(final RecordSet rs) throws IOException { int count = 0 new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).recordSet { Record r while (r = rs.next()) { count++ record { rs.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } } } WriteResult.of(count, [:]) } public void beginRecordSet() throws IOException { } @Override public WriteResult finishRecordSet() throws IOException { return WriteResult.of(recordCount, [:]); } @Override public void close() throws IOException { } @Override public void flush() throws IOException { } } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) throws SchemaNotFoundException, IOException { return null; } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out) } } writer = new GroovyRecordSetWriterFactory()
Note: The script body was taken from https://github.com/apache/nifi/blob/rel/nifi-1.3.0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scri...
Running the processor now produces the flowfile with XML contents:
Here is a template that converts the CSV file to JSON, Avro and XML: convert-csv-to-json-avro-xml.xml
Note: After importing this template, make sure the directory paths for the GetFile and PutFile processors exist, confirm users.csv is in the input directory and remember to enable all Controller Services before running the flow.
Here are some links to check out if you are interested in more information on the record-oriented processors and controller services in NiFi:
Created on 01-10-2018 06:41 PM
Hi @Andrew Lim
I am having problem in Nifi data flow. How to resolve this pls guide me .
I have CSV file which is having one column in json format like this (Input)
Name : Surendra
Age : 25
Address :{city:Chennai,state:TN,zipcode:600343}
Now I am required output like this
Name and Age remains same but address fields should be like this
Address_city : Chennai
Address_state : TN
Address_zipcode : 600343
Pls any guide, how to solve this using Nifi.
Created on 01-19-2018 07:59 PM
Probably best to post a new question on HCC for your issue. I'm not sure how to address it and the entire HCC community would be aware of your problem to help out.
Created on 01-19-2018 08:05 PM
If you are using Apache NiFi 1.5.0, the script body used for the CSV to XML conversion needs to be updated/corrected with the following:
import groovy.xml.MarkupBuilder import org.apache.nifi.controller.AbstractControllerServiceimport org.apache.nifi.flowfile.FlowFileimport org.apache.nifi.logging.ComponentLogimport org.apache.nifi.schema.access.SchemaNotFoundExceptionimport org.apache.nifi.serialization.RecordSetWriterimport org.apache.nifi.serialization.RecordSetWriterFactoryimport org.apache.nifi.serialization.WriteResultimport org.apache.nifi.serialization.record.Recordimport org.apache.nifi.serialization.record.RecordSchemaimport org.apache.nifi.serialization.record.RecordSetimport org.apache.nifi.stream.io.NonCloseableOutputStream class GroovyRecordSetWriter implements RecordSetWriter { private int recordCount = 0; private final OutputStream out; public GroovyRecordSetWriter(final OutputStream out) { this.out = out; } @Override WriteResult write(Record r) throws IOException { new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).record { r.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } recordCount++; WriteResult.of(1, [:]) } @Override String getMimeType() { return 'application/xml' } @Override WriteResult write(final RecordSet rs) throws IOException { int count = 0 new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).recordSet { Record r while (r = rs.next()) { count++ record { rs.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } } } WriteResult.of(count, [:]) } public void beginRecordSet() throws IOException { } @Override public WriteResult finishRecordSet() throws IOException { return WriteResult.of(recordCount, [:]); } @Override public void close() throws IOException { } @Override public void flush() throws IOException { }} class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return null } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out) } } writer = new GroovyRecordSetWriterFactory()
Created on 06-26-2018 02:18 PM
NiFi 1.7.0 introduced XML Record Reader/Writers. So CSV to XML conversion using a ConvertRecord processor can now be done simply with CSVReader and XMLRecordSetWriter controller services as shown here:
https://community.hortonworks.com/content/kbentry/199310/xml-record-writer-in-apache-nifi-170.html
Created on 05-30-2019 01:11 PM
Hi @Andrew Lim , Thanks for a detailed explaination. Following your article i am trying to convert a csv to json using convertrecord processor and then load the merged json (output of convertrecord) to redshift using copy from a file.my merged json is stored in s3.I am getting error that csv is not in json format, could you please suggest how to load these records all at once to redshift?