Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark SQL: Limit clause performance issues

Solved Go to solution

Spark SQL: Limit clause performance issues

Expert Contributor

I have a huge Hive Table (ORC) and I want to select just a few rows of the table (in Zeppelin).

%spark
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
val df1 = sqlContext.sql("SELECT * FROM mydb.myhugetable LIMIT 1") // Takes 10 mins
val df2 = sqlContext.sql("SELECT * FROM mydb.myhugetable").limit(1) // Takes 10 mins

Using the LIMIT clause in my SQL statement or the corresponding dataframe method DF.limit doesn't help, as the query still takes too long. It seems to read the whole table first and then just returning the n rows.

How can I achieve, that the filter limits the data during running the SQL and therefore runs faster?

Shouldn't the filter pushdown help here? I can't see any difference with the setting spark.sql.orc.filterPushdown set to true or false. Thank you!

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark SQL: Limit clause performance issues

@Daniel Müller

Could you share the explain extended for the above query? From the logical/physical plan details you could see whether filter pushdown is includes the limit. If this is spark with llap integration, I know this is not supported previous HDP 3.0. Starting HDP 3.0 we have added the HWC (hive warehouse connector) for spark, which will work as expected.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

4 REPLIES 4

Re: Spark SQL: Limit clause performance issues

@Daniel Müller

Could you share the explain extended for the above query? From the logical/physical plan details you could see whether filter pushdown is includes the limit. If this is spark with llap integration, I know this is not supported previous HDP 3.0. Starting HDP 3.0 we have added the HWC (hive warehouse connector) for spark, which will work as expected.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Re: Spark SQL: Limit clause performance issues

Expert Contributor

Here the extended explain of the DF, LLAP is not enabled in our cluster:

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [importtime: timestamp, tester: string ... 21 more fields]
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
      +- SubqueryAlias mytable
         +- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc
== Analyzed Logical Plan ==
importtime: timestamp, tester: string, testerhead: string, matchcode: string, revision: string, usertext: string, teststufe: string, temp: string, filedat: date, starttime: timestamp, endtime: timestamp, lotnrc: string, wafernr: string, testname: string, testnumber: string, testtype: string, unit: string, highl: string, lowl: string, highs: string, lows: string, valuelist: string, hashvalue: int
GlobalLimit 1
+- LocalLimit 1
   +- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
      +- SubqueryAlias mytable
         +- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc
== Optimized Logical Plan ==
InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   +- CollectLimit 1
      +- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user...
== Physical Plan ==
InMemoryTableScan [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22]
   +- InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- CollectLimit 1
            +- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user...
<br>

Re: Spark SQL: Limit clause performance issues

@Daniel Müller As I thought the PushedFilters are empty. I checked the spark.sql.orc.filterPushdown implementation details and looks like LIMIT is not supported. You can read more by looking at inline comments here:

https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/Or...

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Highlighted

Re: Spark SQL: Limit clause performance issues

Expert Contributor

@Felix Albani Thank you for your help! Without the LIMIT clause, the Job works perfectly (and in parallel).

Don't have an account?
Coming from Hortonworks? Activate your account here