Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Spark 1.6.1 with Orc Files

New Contributor

Hi,

I am working with Spark 1.6.1 and hive 1.2.0 and I have Hive managed Table stored in Orc format.

I am trying to query this table and make sure predicates are pushed down,

#1) using Spark Sql, and I am using Spark Thrift server

when i look at the explain plan, I always see HiveTableScan. and I believe this is reading entire data. How can i make sure predicates are pushed down.

#2) using Hive, and I am using Hive Thrift server

and here all the time, I see a Map Reduce Job being triggered. how can i validate predicate pushdown is happening.

I want to make sure data is read in an optimized way.

thanks for your help

Regards

2 REPLIES 2

@siva gudavalli

Please set "spark.sql.orc.filterPushdown=true" in spark thrift server configs.

Refer this guide for more details on dealing with ORC: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_spark-component-guide/content/orc-spark....

New Contributor

Hello Sandeep,

In my case the pushdown is not happening if I try to read a hive Table directly using spark SQL.

If i load the same Orc files using spark, then I can see the pushdown happening

Here is my execution plan ->

when I read the hive Table using spark SQL

scala> val hhhhhlogsv2 = sqlContext.read.table("hhhhhlogsv5")
hhhhhlogsv2: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]


scala> sqlContext.sql("select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000" ).explain
17/10/24 15:36:24 INFO ParseDriver: Parsing command: select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000
17/10/24 15:36:24 INFO ParseDriver: Parse Completed
17/10/24 15:36:24 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 399.3 KB, free 841.6 KB)
17/10/24 15:36:24 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 42.8 KB, free 884.4 KB)
17/10/24 15:36:24 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 172.21.158.61:46523 (size: 42.8 KB, free: 511.4 MB)
17/10/24 15:36:24 INFO SparkContext: Created broadcast 19 from explain at <console>:39
== Physical Plan ==
TakeOrderedAndProject(limit=10000, orderBy=[id#248 DESC], output=[id#248])
+- ConvertToSafe
   +- Project [id#248]
      +- Filter (cat#253 = 27)
         +- HiveTableScan [id#248,cat#253], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#245 = 20171002),(catpartkey#246 = others),(usrpartkey#247 = hhhUsers)]

now when I load the files using spark

scala> val hhhhhlogsv2 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on driver
17/10/24 15:38:09 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers on driver
hhhhhlogsv2: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]


scala> hhhhhlogsv2.registerTempTable("tempo")


scala> sqlContext.sql("select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000" ).explain
17/10/24 15:38:22 INFO ParseDriver: Parsing command: select id from tempo where cdt=20171002 and catpartkey = 'others' and usrpartkey = 'hhhUsers' and cat=27 order by id desc limit 10000
17/10/24 15:38:22 INFO ParseDriver: Parse Completed
17/10/24 15:38:22 INFO DataSourceStrategy: Selected 0 partitions out of 1, pruned 100.0% partitions.
17/10/24 15:38:22 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 164.5 KB, free 1048.9 KB)
17/10/24 15:38:22 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 18.3 KB, free 1067.2 KB)
17/10/24 15:38:22 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 172.21.158.61:46523 (size: 18.3 KB, free: 511.4 MB)
17/10/24 15:38:22 INFO SparkContext: Created broadcast 20 from explain at <console>:39
== Physical Plan ==
TakeOrderedAndProject(limit=10000, orderBy=[id#287 DESC], output=[id#287])
+- ConvertToSafe
   +- Project [id#287]
      +- Filter (cat#292 = 27)
         +- Scan OrcRelation[id#287,cat#292] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: [EqualTo(cat,27)]

any pointers. thank you.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.