Created on 10-10-201708:53 PM - edited on 02-15-202011:31 AM by lwang
Parquet is a famous file format used with several tools such as Spark. NiFi can be used to easily convert data from different formats such as Avro, CSV or JSON to Parquet. This article explains how to convert data from JSON to Parquet using the PutParquet processor.
Define a schema for the source data
In this article I'll be using JSON data source with the following structure:
Since we will be using a record based processor, we need to define a schema for our data. This will be an Avro schema but it can be used with other types as well. It's only "a common langage" that helps us describe a schema. The Avro Schema for my data is the following:
For testing, I'll generate random dummy data using a GenerateFlowFile processor with the following configuration
Convert JSON to Parquet
Now let's use a PutParquet processor to convert our data. PutParquet is a special record based processor because of the specifities of Parquet format. Since Parquet's API is based on the Hadoop Path object, and not InputStreams/OutputStreams, NiFi doesn't generate a Parquet flow file directly. Instead, NiFi takes data in record format (in memory) and write it in Parquet on an HDFS cluster. For this reason, we need to configure PutParquet with a Hadoop cluster like we usually do for a PutHDFS.
Hadoop Configuration Resources: a local path for core-site.xml and hdfs-site.xml files from our Hadoop cluster. You can use Ambari to easily download these files from your HDP cluster.
RecordReader: a JSONTreeReader that will be used to read our source data and convert it to record format in memory. This record reader should be configured with the same schema and schema access strategy as PutParquet.
Directory: an HDFS directory where Parquet files will be written
Schema Access Strategy: where to get the schema that will be used for written data. For the sake of simplicity, I'll use the schema text property to define the schema. You can use a schema registry for more governed solution.
Schema text: the Avro Schema that we defined in previous section
Other parameters: this processor has several parameters to help tune the Parquet conversion. I'll let the the default values since details of Parquet format are out of the scope of this article.
Let's connect the different processors and start data generation/conversion.
As discussed before, PutParquet writes parquet data directly into HDFS. Let's check in /tmp/nifi to see the generated data. Note that data coming out from this processor will be the original JSON data. If the result Parquet files are required for the remaining of the flow, NiFi should pull them from HDFS using List/FetchHDFS.
Now let's try and read the data in HDFS to check if we have all the information and the right format. There are several ways to do it. What I like to do is to start a Spark shell and try to read the content of my file. Spark has a very good built-in support for Parquet.
Start a Spark-Shell session and run the following code
val myParquet = sqlContext.read.parquet("/tmp/nifi/748458744258259")
As you can see in the screenshot below, we got the same schema and data from our initial JSON data.
If you want to convert other data than JSON, you can use the same process with other RecordReader such as Avro or CSV record reader.