Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar

Copy, paste and run the following code:

val data = Array(1, 2, 3, 4, 5)                     // create Array of Integers
val dataRDD = sc.parallelize(data)                  // create an RDD
val dataDF = dataRDD.toDF()                         // convert RDD to DataFrame
dataDF.write.parquet("data.parquet")                // write to parquet
val newDataDF = sqlContext.
                read.parquet("data.parquet")        // read back parquet to DF
newDataDF.show()                                    // show contents

If you run this code in a Zeppelin notebook you will see the following output data:

data: Array[Int] = Array(1, 2, 3, 4, 5)
dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:31
dataDF: org.apache.spark.sql.DataFrame = [_1: int]
newDataDF: org.apache.spark.sql.DataFrame = [_1: int]
+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
109,711 Views
Comments
avatar
Master Guru

Great sample code. In most of my Spark apps when working with Parquet, I have a few configurations that help.

There are some SparkConfigurations that will help working with Parquet files.

val sparkConf = new SparkConf()
sparkConf.set("spark.sql.parquet.compression.codec", "snappy") 
sparkConf.set("spark.sql.parquet.mergeSchema", "true") 
sparkConf.set("spark.sql.parquet.binaryAsString", "true")

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")

Some of the compression items are really important for different use cases. You also often need to turn them off or switch which codec you use depending on use case (batch, streaming, sql, large, small, many partitions, ...)

EventLog enabled so you can look at how those parquet files are worked with in DAGs and metrics.

Before you right some SparkSQL on that file, make sure you register a table name.

If you don't want to do a write that will file if the directory/file already exists, you can choose Append mode to add to it. It depends on your use case.

df1.registerTempTable("MyTableName")
val results = sqlContext.sql("SELECT name FROM MyTableName")


df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("data.parquet")

If you want to look at the data from the command line after you write it, you can download parquet tools. This requires the Java JDK, git and Maven installed.

git clone -b apache-parquet-1.8.0 https://github.com/apache/parquet-mr.git
cd parquet-mr 
cd parquet-tools 
mvn clean package -Plocal
avatar
Rising Star

Hi Robert,

Thanks for the details. Could you help me in reading a parquet file.

I have loaded some data in Hive and to validate the data I have run the TopNotch script(https://github.com/blackrock/TopNotch). This script has created the bad records in a fileName.gz.parquet file in HDFS under home directory. This script uses Sparksql.

Now, I wanted to read/see these invalid records. I have tried the above script but it fails. Could the above script be used to read data from parquet file.

val newDataDF = sqlContext.read.parquet("/user/user1/topnotch/part-r-00000-1513f167-1c5a-4ca8-bb08-6b7cb70a64dc.gz.parquet")

The above line throws error as not found. I wanted these invalid records to be loaded in hive table for querying:

parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

Thank you.

avatar
New Contributor

Great!!

I set up a spark-cluster with 2 workers. I save a Dataframe using partitionBy ("column x") as a parquet format to some path on each worker. The matter is that i am able to save it but if i want to read it back i am getting these errors: - Could not read footer for file file´status ...... - unable to specify Schema ... Any Suggestions?