Support Questions
Find answers, ask questions, and share your expertise

Spark 1.6.1 with Orc Files

New Contributor

Hi,

I am working with Spark 1.6.1 and hive 1.2.0 and I have Hive managed Table stored in Orc format.

I am trying to query this table and make sure predicates are pushed down,

#1) using Spark Sql, and I am using Spark Thrift server

when i look at the explain plan, I always see HiveTableScan. and I believe this is reading entire data. How can i make sure predicates are pushed down.

#2) using Hive, and I am using Hive Thrift server

and here all the time, I see a Map Reduce Job being triggered. how can i validate predicate pushdown is happening.

I want to make sure data is read in an optimized way.

thanks for your help

Regards

2 REPLIES 2

Re: Spark 1.6.1 with Orc Files

@siva gudavalli

Please set "spark.sql.orc.filterPushdown=true" in spark thrift server configs.

Refer this guide for more details on dealing with ORC: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_spark-component-guide/content/orc-spark....

Re: Spark 1.6.1 with Orc Files

New Contributor

Hello Sandeep,

In my case the pushdown is not happening if I try to read a hive Table directly using spark SQL.

If i load the same Orc files using spark, then I can see the pushdown happening

Here is my execution plan ->

when I read the hive Table using spark SQL

scala> val hhhhhlogsv2 = sqlContext.read.table("hhhhhlogsv5")
hhhhhlogsv2: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]


scala> sqlContext.sql("select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000" ).explain
17/10/24 15:36:24 INFO ParseDriver: Parsing command: select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000
17/10/24 15:36:24 INFO ParseDriver: Parse Completed
17/10/24 15:36:24 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 399.3 KB, free 841.6 KB)
17/10/24 15:36:24 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 42.8 KB, free 884.4 KB)
17/10/24 15:36:24 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 172.21.158.61:46523 (size: 42.8 KB, free: 511.4 MB)
17/10/24 15:36:24 INFO SparkContext: Created broadcast 19 from explain at <console>:39
== Physical Plan ==
TakeOrderedAndProject(limit=10000, orderBy=[id#248 DESC], output=[id#248])
+- ConvertToSafe
   +- Project [id#248]
      +- Filter (cat#253 = 27)
         +- HiveTableScan [id#248,cat#253], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#245 = 20171002),(catpartkey#246 = others),(usrpartkey#247 = hhhUsers)]

now when I load the files using spark

scala> val hhhhhlogsv2 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on driver
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers on driver
hhhhhlogsv2: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]


scala> hhhhhlogsv2.registerTempTable("tempo")


scala> sqlContext.sql("select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000" ).explain
17/10/24 15:38:22 INFO ParseDriver: Parsing command: select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000
17/10/24 15:38:22 INFO ParseDriver: Parse Completed
17/10/24 15:38:22 INFO DataSourceStrategy: Selected 0 partitions out of 1, pruned 100.0% partitions.
17/10/24 15:38:22 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 164.5 KB, free 1048.9 KB)
17/10/24 15:38:22 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 18.3 KB, free 1067.2 KB)
17/10/24 15:38:22 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 172.21.158.61:46523 (size: 18.3 KB, free: 511.4 MB)
17/10/24 15:38:22 INFO SparkContext: Created broadcast 20 from explain at <console>:39
== Physical Plan ==
TakeOrderedAndProject(limit=10000, orderBy=[id#287 DESC], output=[id#287])
+- ConvertToSafe
   +- Project [id#287]
      +- Filter (cat#292 = 27)
         +- Scan OrcRelation[id#287,cat#292] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: [EqualTo(cat,27)]

any pointers. thank you.