Created 10-19-2017 01:31 PM
Hi,
I am working with Spark 1.6.1 and hive 1.2.0 and I have Hive managed Table stored in Orc format.
I am trying to query this table and make sure predicates are pushed down,
#1) using Spark Sql, and I am using Spark Thrift server
when i look at the explain plan, I always see HiveTableScan. and I believe this is reading entire data. How can i make sure predicates are pushed down.
#2) using Hive, and I am using Hive Thrift server
and here all the time, I see a Map Reduce Job being triggered. how can i validate predicate pushdown is happening.
I want to make sure data is read in an optimized way.
thanks for your help
Regards
Created 10-20-2017 03:28 PM
Please set "spark.sql.orc.filterPushdown=true" in spark thrift server configs.
Refer this guide for more details on dealing with ORC: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_spark-component-guide/content/orc-spark....
Created 10-24-2017 03:39 PM
Hello Sandeep,
In my case the pushdown is not happening if I try to read a hive Table directly using spark SQL.
If i load the same Orc files using spark, then I can see the pushdown happening
Here is my execution plan ->
when I read the hive Table using spark SQL
scala> val hhhhhlogsv2 = sqlContext.read.table("hhhhhlogsv5") hhhhhlogsv2: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string] scala> sqlContext.sql("select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000" ).explain 17/10/24 15:36:24 INFO ParseDriver: Parsing command: select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000 17/10/24 15:36:24 INFO ParseDriver: Parse Completed 17/10/24 15:36:24 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 399.3 KB, free 841.6 KB) 17/10/24 15:36:24 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 42.8 KB, free 884.4 KB) 17/10/24 15:36:24 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 172.21.158.61:46523 (size: 42.8 KB, free: 511.4 MB) 17/10/24 15:36:24 INFO SparkContext: Created broadcast 19 from explain at <console>:39 == Physical Plan == TakeOrderedAndProject(limit=10000, orderBy=[id#248 DESC], output=[id#248]) +- ConvertToSafe +- Project [id#248] +- Filter (cat#253 = 27) +- HiveTableScan [id#248,cat#253], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#245 = 20171002),(catpartkey#246 = others),(usrpartkey#247 = hhhUsers)]
now when I load the files using spark
scala> val hhhhhlogsv2 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5") 17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver 17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver 17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on driver 17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers on driver hhhhhlogsv2: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string] scala> hhhhhlogsv2.registerTempTable("tempo") scala> sqlContext.sql("select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000" ).explain 17/10/24 15:38:22 INFO ParseDriver: Parsing command: select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000 17/10/24 15:38:22 INFO ParseDriver: Parse Completed 17/10/24 15:38:22 INFO DataSourceStrategy: Selected 0 partitions out of 1, pruned 100.0% partitions. 17/10/24 15:38:22 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 164.5 KB, free 1048.9 KB) 17/10/24 15:38:22 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 18.3 KB, free 1067.2 KB) 17/10/24 15:38:22 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 172.21.158.61:46523 (size: 18.3 KB, free: 511.4 MB) 17/10/24 15:38:22 INFO SparkContext: Created broadcast 20 from explain at <console>:39 == Physical Plan == TakeOrderedAndProject(limit=10000, orderBy=[id#287 DESC], output=[id#287]) +- ConvertToSafe +- Project [id#287] +- Filter (cat#292 = 27) +- Scan OrcRelation[id#287,cat#292] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: [EqualTo(cat,27)]
any pointers. thank you.