Parquet is a famous file format used with several tools such as Spark. NiFi can be used to easily convert data from different formats such as Avro, CSV or JSON to Parquet. This article explains how to convert data from JSON to Parquet using the PutParquet processor.
In this article I'll be using JSON data source with the following structure:
{ "created_at" : "Tue Oct 10 21:47:12 CEST 2017", "id_store" : 4, "event_type" : "store capacity", "id_transaction" : "6594277248900858122", "id_product" : 319, "value_product" : 1507664832649 }
Since we will be using a record based processor, we need to define a schema for our data. This will be an Avro schema but it can be used with other types as well. It's only "a common langage" that helps us describe a schema. The Avro Schema for my data is the following:
{ "namespace": "nifi", "name": "store_event", "type": "record", "fields": [ { "name": "created_at", "type": "string" }, { "name": "id_store", "type": "int" }, { "name": "event_type", "type": "string" }, { "name": "id_transaction", "type": "string" }, { "name": "id_product", "type": "int" }, { "name": "value_product", "type": "int" } ] }
For testing, I'll generate random dummy data using a GenerateFlowFile processor with the following configuration
Now let's use a PutParquet processor to convert our data. PutParquet is a special record based processor because of the specifities of Parquet format. Since Parquet's API is based on the Hadoop Path object, and not InputStreams/OutputStreams, NiFi doesn't generate a Parquet flow file directly. Instead, NiFi takes data in record format (in memory) and write it in Parquet on an HDFS cluster. For this reason, we need to configure PutParquet with a Hadoop cluster like we usually do for a PutHDFS.
Let's connect the different processors and start data generation/conversion.
As discussed before, PutParquet writes parquet data directly into HDFS. Let's check in /tmp/nifi to see the generated data. Note that data coming out from this processor will be the original JSON data. If the result Parquet files are required for the remaining of the flow, NiFi should pull them from HDFS using List/FetchHDFS.
Now let's try and read the data in HDFS to check if we have all the information and the right format. There are several ways to do it. What I like to do is to start a Spark shell and try to read the content of my file. Spark has a very good built-in support for Parquet.
Start a Spark-Shell session and run the following code
val myParquet = sqlContext.read.parquet("/tmp/nifi/748458744258259") myParquet.show()
As you can see in the screenshot below, we got the same schema and data from our initial JSON data.
If you want to convert other data than JSON, you can use the same process with other RecordReader such as Avro or CSV record reader.
Created on 12-06-2017 03:13 PM
Hi, thank for your tuto.
Do you know if PutParquet could to deal with "date", "timestamp" type or it depends avro Schema ?
So the answer is NO..
Created on 12-09-2017 05:25 PM
Hi @mayki wogno
I didn't test it but you should be able to do it. At least RecordReader support it : https://community.hortonworks.com/questions/113959/use-nifi-to-change-the-format-of-numeric-date-and...
Created on 02-20-2018 01:51 PM
Any recommendations on converting XML to Parquet?
Created on 06-04-2019 09:52 AM
Waht If i wanted to put my parquet into an S£ instead of HDFS?