Created on 03-05-2016 12:32 AM
Copy, paste and run the following code:
val data = Array(1, 2, 3, 4, 5) // create Array of Integers val dataRDD = sc.parallelize(data) // create an RDD val dataDF = dataRDD.toDF() // convert RDD to DataFrame dataDF.write.parquet("data.parquet") // write to parquet val newDataDF = sqlContext. read.parquet("data.parquet") // read back parquet to DF newDataDF.show() // show contents
If you run this code in a Zeppelin notebook you will see the following output data:
data: Array[Int] = Array(1, 2, 3, 4, 5) dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:31 dataDF: org.apache.spark.sql.DataFrame = [_1: int] newDataDF: org.apache.spark.sql.DataFrame = [_1: int] +---+ | _1| +---+ | 1| | 2| | 3| | 4| | 5| +---+
Created on 04-27-2016 03:47 PM
Great sample code. In most of my Spark apps when working with Parquet, I have a few configurations that help.
There are some SparkConfigurations that will help working with Parquet files.
val sparkConf = new SparkConf() sparkConf.set("spark.sql.parquet.compression.codec", "snappy") sparkConf.set("spark.sql.parquet.mergeSchema", "true") sparkConf.set("spark.sql.parquet.binaryAsString", "true") sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "true") sparkConf.set("spark.streaming.backpressure.enabled", "true")
Some of the compression items are really important for different use cases. You also often need to turn them off or switch which codec you use depending on use case (batch, streaming, sql, large, small, many partitions, ...)
EventLog enabled so you can look at how those parquet files are worked with in DAGs and metrics.
Before you right some SparkSQL on that file, make sure you register a table name.
If you don't want to do a write that will file if the directory/file already exists, you can choose Append mode to add to it. It depends on your use case.
df1.registerTempTable("MyTableName") val results = sqlContext.sql("SELECT name FROM MyTableName") df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("data.parquet")
If you want to look at the data from the command line after you write it, you can download parquet tools. This requires the Java JDK, git and Maven installed.
git clone -b apache-parquet-1.8.0 https://github.com/apache/parquet-mr.git cd parquet-mr cd parquet-tools mvn clean package -Plocal
Created on 05-26-2016 03:05 PM
Hi Robert,
Thanks for the details. Could you help me in reading a parquet file.
I have loaded some data in Hive and to validate the data I have run the TopNotch script(https://github.com/blackrock/TopNotch). This script has created the bad records in a fileName.gz.parquet file in HDFS under home directory. This script uses Sparksql.
Now, I wanted to read/see these invalid records. I have tried the above script but it fails. Could the above script be used to read data from parquet file.
val newDataDF = sqlContext.read.parquet("/user/user1/topnotch/part-r-00000-1513f167-1c5a-4ca8-bb08-6b7cb70a64dc.gz.parquet")
The above line throws error as not found. I wanted these invalid records to be loaded in hive table for querying:
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
Thank you.
Created on 08-28-2017 11:25 AM
Great!!
I set up a spark-cluster with 2 workers. I save a Dataframe using partitionBy ("column x") as a parquet format to some path on each worker. The matter is that i am able to save it but if i want to read it back i am getting these errors: - Could not read footer for file file´status ...... - unable to specify Schema ... Any Suggestions?