Member since
01-11-2016
355
Posts
230
Kudos Received
74
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
8292 | 06-19-2018 08:52 AM | |
3215 | 06-13-2018 07:54 AM | |
3668 | 06-02-2018 06:27 PM | |
3966 | 05-01-2018 12:28 PM | |
5507 | 04-24-2018 11:38 AM |
10-10-2017
09:14 PM
@Mohamed Ashraf Following your question, I wrote an article on how to use PutParquet to convert data. Check it out to have a better understanding on the process. https://community.hortonworks.com/articles/140422/convert-data-from-jsoncsvavro-to-parquet-with-nifi.html I hope this helps
... View more
10-10-2017
08:53 PM
5 Kudos
Introduction
Parquet is a famous file format used with several tools such as Spark. NiFi can be used to easily convert data from different formats such as Avro, CSV or JSON to Parquet. This article explains how to convert data from JSON to Parquet using the PutParquet processor.
Implementation
Define a schema for the source data
In this article I'll be using JSON data source with the following structure:
{
"created_at" : "Tue Oct 10 21:47:12 CEST 2017",
"id_store" : 4,
"event_type" : "store capacity",
"id_transaction" : "6594277248900858122",
"id_product" : 319,
"value_product" : 1507664832649
}
Since we will be using a record based processor, we need to define a schema for our data. This will be an Avro schema but it can be used with other types as well. It's only "a common langage" that helps us describe a schema. The Avro Schema for my data is the following:
{
"namespace": "nifi",
"name": "store_event",
"type": "record",
"fields": [
{ "name": "created_at", "type": "string" },
{ "name": "id_store", "type": "int" },
{ "name": "event_type", "type": "string" },
{ "name": "id_transaction", "type": "string" },
{ "name": "id_product", "type": "int" },
{ "name": "value_product", "type": "int" }
]
}
Generate data for testing
For testing, I'll generate random dummy data using a GenerateFlowFile processor with the following configuration
Convert JSON to Parquet
Now let's use a PutParquet processor to convert our data. PutParquet is a special record based processor because of the specifities of Parquet format. Since Parquet's API is based on the Hadoop Path object, and not InputStreams/OutputStreams, NiFi doesn't generate a Parquet flow file directly. Instead, NiFi takes data in record format (in memory) and write it in Parquet on an HDFS cluster. For this reason, we need to configure PutParquet with a Hadoop cluster like we usually do for a PutHDFS.
Hadoop Configuration Resources: a local path for core-site.xml and hdfs-site.xml files from our Hadoop cluster. You can use Ambari to easily download these files from your HDP cluster.
RecordReader: a JSONTreeReader that will be used to read our source data and convert it to record format in memory. This record reader should be configured with the same schema and schema access strategy as PutParquet.
Directory: an HDFS directory where Parquet files will be written
Schema Access Strategy: where to get the schema that will be used for written data. For the sake of simplicity, I'll use the schema text property to define the schema. You can use a schema registry for more governed solution.
Schema text: the Avro Schema that we defined in previous section
Other parameters: this processor has several parameters to help tune the Parquet conversion. I'll let the the default values since details of Parquet format are out of the scope of this article.
Complete flow
Let's connect the different processors and start data generation/conversion.
Results
As discussed before, PutParquet writes parquet data directly into HDFS. Let's check in /tmp/nifi to see the generated data. Note that data coming out from this processor will be the original JSON data. If the result Parquet files are required for the remaining of the flow, NiFi should pull them from HDFS using List/FetchHDFS.
Now let's try and read the data in HDFS to check if we have all the information and the right format. There are several ways to do it. What I like to do is to start a Spark shell and try to read the content of my file. Spark has a very good built-in support for Parquet.
Start a Spark-Shell session and run the following code
val myParquet = sqlContext.read.parquet("/tmp/nifi/748458744258259")
myParquet.show()
As you can see in the screenshot below, we got the same schema and data from our initial JSON data.
If you want to convert other data than JSON, you can use the same process with other RecordReader such as Avro or CSV record reader.
... View more
Labels:
10-10-2017
05:39 PM
2 Kudos
@Hameed Abdul This is the expected behavior. Your file name is local to your client environment and NiFi have no clue on where it comes from. You need to send this information together with your file. The number that you are seeing is the ID that NiFi generated for this received flow file. How to send data depends on your client. If I use CURL to upload data I can do curl --form "fileupload=@/tmp/file1.txt;filename=file1.txt" localhost:7878
and I receive the following in NiFi --------------------------2c1843649b5760e1
Content-Disposition: form-data; name="fileupload"; filename="file1.txt"
Content-Type: text/plain
this is file 1
--------------------------2c1843649b5760e1--
You can use ExtractGrok or ExtractText to get the filename and update the flow file attribute. EDIT: As I said, this depends on your client. I did the same test with Postman and here's how to get the right information. Add a header to your POST query. I added a header called originalfilename I get this information as an attribute of my flow file With this you only need to use UpdateAttribute processor to copy this attribute to filename. Hope this helps
... View more
10-10-2017
02:28 PM
@Mohamed Ashraf I don't have the possibility to test your scenario right now but the PutParquet should write the parquet file directly on HDFS so no need to PutHDFS. What do you have in /user/nifi ? what directory have you configured with PutHDFS ?
... View more
10-10-2017
02:21 PM
@Simon Jespersen happy to help 🙂 Thanks
... View more
10-10-2017
06:28 AM
@Mohamed Ashraf What configuration are you using? what error do you have?
... View more
10-08-2017
09:51 PM
1 Kudo
@Mohamed Ashraf Have tried PutParquet : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-parquet-nar/1.4.0/org.apache.nifi.processors.parquet.PutParquet/index.html It's a record based processor that you can use to read CSV file with CSVReader and write it as parquet I hope this helps
... View more
10-08-2017
02:28 PM
@thenewbee Which credentials are you using? the admin/admin is disabled by default on the sandbox. Use raj_ops/raj_ops or maria_dev/maria_dev instead. If you want to use admin, you can run the following command with ssh to enable it ambari-admin-password-reset
... View more
10-08-2017
02:22 PM
2 Kudos
@Simon Jespersen Try this approach: - Fetch only the json files (data1_control.json). Use filter regex for this - Use EvaluateJSONPath to get the md5 into an attribute hash1 - Use update attribute to generate the name of the data file and store it in an attribute file_to_get. Since you have the control name (data1_control.json), you can generate the file name (data1.xml) using NiFi expression langage. - In the same flow, fetch the corresponding data file file_to_get with fetch processor. Now you have the content of this file in you flow file. - Use HashContent to get the md5 and store in attribute hash2 - Use Route on attribute to keep only flow file having hash1 equals to hash2 I hope this helps
... View more
10-08-2017
02:03 PM
1 Kudo
@Kiem Nguyen It depends on what part you want to load balance. If it's data reception, you need to use load balancer since NiFi is only listening. If it's data processing, then you can receive data on one node and use S2S and RPG to distribute load on other nodes and do the processing of this data on the whole cluster. Note that in this case you have no High Availability for data reception. Your clients are configured with the address on one node, so if this goes down they won't be able to get data into NiFi and you can loose data. That's another benefit of having a load balancer. I hope this helps.
... View more