Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar

Introduction

Parquet is a famous file format used with several tools such as Spark. NiFi can be used to easily convert data from different formats such as Avro, CSV or JSON to Parquet. This article explains how to convert data from JSON to Parquet using the PutParquet processor.

Implementation

Define a schema for the source data

In this article I'll be using JSON data source with the following structure:

{
"created_at" : "Tue Oct 10 21:47:12 CEST 2017",
"id_store" : 4,
"event_type" : "store capacity",
"id_transaction" : "6594277248900858122",
"id_product" : 319,
"value_product" : 1507664832649
}

Since we will be using a record based processor, we need to define a schema for our data. This will be an Avro schema but it can be used with other types as well. It's only "a common langage" that helps us describe a schema. The Avro Schema for my data is the following:

{
 "namespace": "nifi",
 "name": "store_event",
 "type": "record",
 "fields": [
  { "name": "created_at", "type": "string" },
  { "name": "id_store", "type": "int" },
  { "name": "event_type", "type": "string" },
  { "name": "id_transaction", "type": "string" },
  { "name": "id_product", "type": "int" },
  { "name": "value_product", "type": "int" }
 ]
}

Generate data for testing

For testing, I'll generate random dummy data using a GenerateFlowFile processor with the following configuration

39752-screen-shot-2017-10-10-at-103220-pm.png

Convert JSON to Parquet

Now let's use a PutParquet processor to convert our data. PutParquet is a special record based processor because of the specifities of Parquet format. Since Parquet's API is based on the Hadoop Path object, and not InputStreams/OutputStreams, NiFi doesn't generate a Parquet flow file directly. Instead, NiFi takes data in record format (in memory) and write it in Parquet on an HDFS cluster. For this reason, we need to configure PutParquet with a Hadoop cluster like we usually do for a PutHDFS.

39753-screen-shot-2017-10-10-at-104308-pm.png

39754-screen-shot-2017-10-10-at-104350-pm.png

  • Hadoop Configuration Resources: a local path for core-site.xml and hdfs-site.xml files from our Hadoop cluster. You can use Ambari to easily download these files from your HDP cluster.
  • RecordReader: a JSONTreeReader that will be used to read our source data and convert it to record format in memory. This record reader should be configured with the same schema and schema access strategy as PutParquet.
  • Directory: an HDFS directory where Parquet files will be written
  • Schema Access Strategy: where to get the schema that will be used for written data. For the sake of simplicity, I'll use the schema text property to define the schema. You can use a schema registry for more governed solution.
  • Schema text: the Avro Schema that we defined in previous section
  • Other parameters: this processor has several parameters to help tune the Parquet conversion. I'll let the the default values since details of Parquet format are out of the scope of this article.

Complete flow

Let's connect the different processors and start data generation/conversion.

39755-a.png

Results

As discussed before, PutParquet writes parquet data directly into HDFS. Let's check in /tmp/nifi to see the generated data. Note that data coming out from this processor will be the original JSON data. If the result Parquet files are required for the remaining of the flow, NiFi should pull them from HDFS using List/FetchHDFS.

39756-screen-shot-2017-10-10-at-105544-pm.png

39757-screen-shot-2017-10-10-at-105832-pm.png

39758-screen-shot-2017-10-10-at-105918-pm.png

Now let's try and read the data in HDFS to check if we have all the information and the right format. There are several ways to do it. What I like to do is to start a Spark shell and try to read the content of my file. Spark has a very good built-in support for Parquet.

Start a Spark-Shell session and run the following code

val myParquet = sqlContext.read.parquet("/tmp/nifi/748458744258259")
myParquet.show()

As you can see in the screenshot below, we got the same schema and data from our initial JSON data.

output.jpg

If you want to convert other data than JSON, you can use the same process with other RecordReader such as Avro or CSV record reader.

27,546 Views
Comments
avatar
Rising Star

Hi, thank for your tuto.

Do you know if PutParquet could to deal with "date", "timestamp" type or it depends avro Schema ?

So the answer is NO..

avatar

Hi @mayki wogno

I didn't test it but you should be able to do it. At least RecordReader support it : https://community.hortonworks.com/questions/113959/use-nifi-to-change-the-format-of-numeric-date-and...

avatar

Any recommendations on converting XML to Parquet?

avatar
Explorer

Waht If i wanted to put my parquet into an S£ instead of HDFS?

Version history
Last update:
‎02-15-2020 11:31 AM
Updated by:
Contributors