Created 09-11-2018 12:01 PM
I have a huge Hive Table (ORC) and I want to select just a few rows of the table (in Zeppelin).
%spark
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
val df1 = sqlContext.sql("SELECT * FROM mydb.myhugetable LIMIT 1") // Takes 10 mins
val df2 = sqlContext.sql("SELECT * FROM mydb.myhugetable").limit(1) // Takes 10 mins
Using the LIMIT clause in my SQL statement or the corresponding dataframe method DF.limit doesn't help, as the query still takes too long. It seems to read the whole table first and then just returning the n rows.
How can I achieve, that the filter limits the data during running the SQL and therefore runs faster?
Shouldn't the filter pushdown help here? I can't see any difference with the setting spark.sql.orc.filterPushdown set to true or false. Thank you!
Created 09-11-2018 12:18 PM
Could you share the explain extended for the above query? From the logical/physical plan details you could see whether filter pushdown is includes the limit. If this is spark with llap integration, I know this is not supported previous HDP 3.0. Starting HDP 3.0 we have added the HWC (hive warehouse connector) for spark, which will work as expected.
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Created 09-11-2018 12:18 PM
Could you share the explain extended for the above query? From the logical/physical plan details you could see whether filter pushdown is includes the limit. If this is spark with llap integration, I know this is not supported previous HDP 3.0. Starting HDP 3.0 we have added the HWC (hive warehouse connector) for spark, which will work as expected.
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Created 09-11-2018 02:06 PM
Here the extended explain of the DF, LLAP is not enabled in our cluster:
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [importtime: timestamp, tester: string ... 21 more fields] == Parsed Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22] +- SubqueryAlias mytable +- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc == Analyzed Logical Plan == importtime: timestamp, tester: string, testerhead: string, matchcode: string, revision: string, usertext: string, teststufe: string, temp: string, filedat: date, starttime: timestamp, endtime: timestamp, lotnrc: string, wafernr: string, testname: string, testnumber: string, testtype: string, unit: string, highl: string, lowl: string, highs: string, lows: string, valuelist: string, hashvalue: int GlobalLimit 1 +- LocalLimit 1 +- Project [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22] +- SubqueryAlias mytable +- Relation[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] orc == Optimized Logical Plan == InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- CollectLimit 1 +- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user... == Physical Plan == InMemoryTableScan [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22] +- InMemoryRelation [importtime#0, tester#1, testerhead#2, matchcode#3, revision#4, usertext#5, teststufe#6, temp#7, filedat#8, starttime#9, endtime#10, lotnrc#11, wafernr#12, testname#13, testnumber#14, testtype#15, unit#16, highl#17, lowl#18, highs#19, lows#20, valuelist#21, hashvalue#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- CollectLimit 1 +- *FileScan orc mydb.mytable[importtime#0,tester#1,testerhead#2,matchcode#3,revision#4,usertext#5,teststufe#6,temp#7,filedat#8,starttime#9,endtime#10,lotnrc#11,wafernr#12,testname#13,testnumber#14,testtype#15,unit#16,highl#17,lowl#18,highs#19,lows#20,valuelist#21,hashvalue#22] Batched: false, Format: ORC, Location: CatalogFileIndex[hdfs://hdp-m-01:8020/apps/hive/warehouse/mydb.db/mytab..., PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<importtime:timestamp,tester:string,testerhead:string,matchcode:string,revision:string,user... <br>
Created 09-12-2018 01:44 PM
@Daniel Müller As I thought the PushedFilters are empty. I checked the spark.sql.orc.filterPushdown implementation details and looks like LIMIT is not supported. You can read more by looking at inline comments here:
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Created 09-14-2018 10:27 AM
@Felix Albani Thank you for your help! Without the LIMIT clause, the Job works perfectly (and in parallel).