I have Hadoop cluster with 4 data node and 4 node manager with HDFS replication factor = 3 and dfs block size = 128 MB. I have spark application that get the data from text file and write to HDFS, in spark application that format parquet file with block size = 512 MB, the parquet file has been written that have size = 1GB. I have check this file on HDFS by command:
hadoop fsck /output/xxx.snappy.parquet -files -blocks -locations
the result is: This file have 2 blocks each block with replication = 3 (It is correct with our expect).
But When I use spark to read this parquet file and try to print number partition
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val df1 = spark.read.parquet("/output/xxx.snappy.parquet") df1.rdd.partitions.length
the result is = 7
And I view on spark UI I see It have 7 task with 2 tasks with input data = 512 MB and 5 tasks with input data = 0
So can you help me explain:
- Why "hadoop fsck /parquet_file -files -blocks -locations" print 2 blocks but read by spark return 7 partition (As I know partition in spark as block on HDFS)?
Hi, could you print the partitions like this? Snappy file can be splittable by range. Usually, Spark splits a large parquet with snappy file into multiple partition with the size of spark.sql.files.maxPartitionBytes
@Tin Huynh I think you should think about the Parquet row group size as defining how much data or how many rows to group together to be processed by a single task (the row groups exist for splittability but each can only be processed by one task). So in terms of the HDFS block size, Spark would get ~7 partitions (1GB / 128MB), but the row group size would dictate that all that data would then be rolled up into two 512MB chunks, leaving 5 empty partitions. Now, that said, most people recommend setting the parquet.block.size = dfs.block.size or setting the parquet.block.size so that dfs.block.size is some integer multiple of the parquet.block.size (more parallelism). This ensures that there's some harmony between the number of partitions/parallelism and the number of row groups.
Basically, either use a different parquet.block.size or repartition your data; the latter however is a pretty inefficient solution.
I set up a spark-cluster with 2 workers. I save a Dtaframe using partitionBy ("column x") as a parquet format to some path on each worker. The matter is that i am able to save it but if i want to read it back i am getting these errors: - Could not read footer for file file´status ...... - unable to specify Schema ... Any Suggestions?