Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark SQL: Limit clause performance issues

avatar
Expert Contributor

I have a huge Hive Table (ORC) and I want to select just a few rows of the table (in Zeppelin).

%spark
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
val df1 = sqlContext.sql("SELECT * FROM mydb.myhugetable LIMIT 1") // Takes 10 mins
val df2 = sqlContext.sql("SELECT * FROM mydb.myhugetable").limit(1) // Takes 10 mins

Using the LIMIT clause in my SQL statement or the corresponding dataframe method DF.limit doesn't help, as the query still takes too long. It seems to read the whole table first and then just returning the n rows.

How can I achieve, that the filter limits the data during running the SQL and therefore runs faster?

Shouldn't the filter pushdown help here? I can't see any difference with the setting spark.sql.orc.filterPushdown set to true or false. Thank you!

1 ACCEPTED SOLUTION

avatar

@Daniel Müller

Could you share the explain extended for the above query? From the logical/physical plan details you could see whether filter pushdown is includes the limit. If this is spark with llap integration, I know this is not supported previous HDP 3.0. Starting HDP 3.0 we have added the HWC (hive warehouse connector) for spark, which will work as expected.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

View solution in original post

4 REPLIES 4

avatar

@Daniel Müller

Could you share the explain extended for the above query? From the logical/physical plan details you could see whether filter pushdown is includes the limit. If this is spark with llap integration, I know this is not supported previous HDP 3.0. Starting HDP 3.0 we have added the HWC (hive warehouse connector) for spark, which will work as expected.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

avatar
Expert Contributor

Here the extended explain of the DF, LLAP is not enabled in our cluster:

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [importtime: timestamp, tester: string ... 21 more fields]
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
      +- SubqueryAlias mytable
         +- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc
== Analyzed Logical Plan ==
importtime: timestamp, tester: string, testerhead: string, matchcode: string, revision: string, usertext: string, teststufe: string, temp: string, filedat: date, starttime: timestamp, endtime: timestamp, lotnrc: string, wafernr: string, testname: string, testnumber: string, testtype: string, unit: string, highl: string, lowl: string, highs: string, lows: string, valuelist: string, hashvalue: int
GlobalLimit 1
+- LocalLimit 1
   +- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
      +- SubqueryAlias mytable
         +- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc
== Optimized Logical Plan ==
InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   +- CollectLimit 1
      +- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user...
== Physical Plan ==
InMemoryTableScan [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
   +- InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- CollectLimit 1
            +- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user...
<br>

avatar

@Daniel Müller As I thought the PushedFilters are empty. I checked the spark.sql.orc.filterPushdown implementation details and looks like LIMIT is not supported. You can read more by looking at inline comments here:

https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/Or...

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

avatar
Expert Contributor

@Felix Albani Thank you for your help! Without the LIMIT clause, the Job works perfectly (and in parallel).