10-29-2016 06:04 AM - edited 10-29-2016 06:15 AM
on Spark 1.6 trying to read a Parquet partitioned table created by Impala to DataFrame. Running a code in spark-shell with these options:
do not prune old partitions.
The spark code:
val q= "select day_id, co_id, dir_id from table where day_id >= 20161001 and day_id < 20161101" val df = sqlContext.sql( q ) df.cache df.count
Spark reads ALL the partitions, in SparkUI in Storage section I see RDD with all partitions:
RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk Project [co_id#15,dir_id#16 : +- TungstenExchange hashpartitioning(co_id#15,200), None : +- Filter isnotnull(co_id#15) : +- Scan ParquetRelation: default.table[co_id#15,day_id#3,dir_id#16] InputPaths: hdfs://nameservice1/warehouse/table/day_id=20070403, hdfs://nameservice1/warehouse/table/day_id=20070404, hdfs://nameservice1/warehouse/table/day_id=20070405, ....
val q= "select day_id, co_id, dir_id from table where day_id >= 20161001 and day_id < 20161101" val df = sqlContext.sql( q ).filter( 'day_id > 20161001 ) df.cache df.count
But did not prune the partitions either,
11-22-2016 12:41 AM
It seems the issue is due to a missmatch between the Hive and Parquet schemas:
Basically, "Hive is case insensitive, while Parquet is not"
In our case, just executing the select statement in lower case solved the issue.
But not sure in your case because your statement is already in lower case :-S
11-23-2016 02:58 AM
Spark SQL in Spark 1.6 prints the physical plan which is supposed to list all the available dirs (InputPaths) rather actual directories to scan while fetching the results for the queries.
But while reading the data, it reads only relevant partition file and the same can be seen from "INFO" log messages. You can turn on the "INFO" level logging by setting: