Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark LLAP Connector : Does not read tables created via the saveAsTable SPARK dataframe method

Spark LLAP Connector : Does not read tables created via the saveAsTable SPARK dataframe method

Cloudera Employee

Hi,

I’ve encountered an issue while using the spark-llap connector (https://github.com/hortonworks-spark/spark-llap).

The connector does not seem to be able to read table schemas that have been created using the saveAsTable SPARK dataframe method, even though the same table is perfectly legible via Hive.

Consider the following CSV file :

1,Mackenzy,Smith,US,1993-12-18,123-456-7890
2,Sherlyn,Miller,US,1975-03-22,234-567-8901
3,Khiana,Wilson,US,1989-08-14,345-678-9012
4,Jack,Thompson,US,1962-10-28,456-789-0123
5,Audrey,Taylor,UK,1985-01-11,12-3456-7890
6,Ruford,Walker,UK,1976-05-19,23-4567-8901
7,Marta,Lloyd,UK,1981-07-23,34-5678-9012
8,Derick,Schneider,DE,1982-04-17,12-345-67890
9,Anna,Richter,DE,1995-09-07,23-456-78901
10,Raina,Graf,DE,1999-02-06,34-567-89012
11,Felix,Lee,CA,1982-04-17,321-654-0987
12,Adam,Brown,CA,1995-09-07,432-765-1098
13,Lucas,Jones,CA,1999-02-06,543-876-2109
14,Yvonne,Dupont,FR,1982-04-17,01-23-45-67-89
15,Pascal,Fournier,FR,1995-09-07,23-45-67-89-01
16,Ariel,Simon,FR,1999-02-06,34-56-78-90-12

Used to create Hive Tables using SPARK via the saveAsTable method and via HiveQL

// read CSV file
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
 
val hdfsNameNode = {nameNodeURI}
val hdfsLocation = "/tmp/llap_spark_test/customers.csv"
val csv_uri = "hdfs://%s:8020/%s".format(hdfsNameNode,hdfsLocation)
 
val schemaString = "id,name_first,name_last,addr_country,date_of_birth,phone_num"
val schema = StructType(schemaString.split(",")
                .map(fieldName => StructField(fieldName, StringType, true)))
 
val customerDf = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter",",")
    .schema(schema)
    .load(csv_uri)
    
//read back schema 
customerDf.printSchema
customerDf.show(2)  
 
 
// Save as Hive table “customers_llap” Spark saveAsTable
customerDf.write.format("orc").mode("overwrite").saveAsTable("employees.customers_llap")
 
 
// Save as Hive table “customers_llap_bis”  HiveQL
customerDf.registerTempTable("customers_llap_tmp")
sqlContext.sql("CREATE TABLE IF NOT EXISTS employees.customers_llap_bis (id  int, name_first string, name_last string, addr_country string, date_of_birth string, phone_num string) stored as ORC")
sqlContext.sql("INSERT OVERWRITE TABLE employees.customers_llap_bis SELECT * FROM customers_llap_tmp")

Reading the tables back using SPARK Hive Context works fine :

Table customers_llap

sqlContext.sql("SELECT * FROM employees.customers_llap").show

Result :

16/12/30 19:20:36 INFO DAGScheduler: Job 1 finished: show at <console>:26, took 6.418023 s
+---+----------+---------+------------+-------------+--------------+
| id|name_first|name_last|addr_country|date_of_birth|     phone_num|
+---+----------+---------+------------+-------------+--------------+
|  1|  Mackenzy|    Smith|          US|   1993-12-18|  123-456-7890|
|  2|   Sherlyn|   Miller|          US|   1975-03-22|  234-567-8901|
|  3|    Khiana|   Wilson|          US|   1989-08-14|  345-678-9012|
|  4|      Jack| Thompson|          US|   1962-10-28|  456-789-0123|
|  5|    Audrey|   Taylor|          UK|   1985-01-11|  12-3456-7890|
|  6|    Ruford|   Walker|          UK|   1976-05-19|  23-4567-8901|
|  7|     Marta|    Lloyd|          UK|   1981-07-23|  34-5678-9012|
|  8|    Derick|Schneider|          DE|   1982-04-17|  12-345-67890|
|  9|      Anna|  Richter|          DE|   1995-09-07|  23-456-78901|
| 10|     Raina|     Graf|          DE|   1999-02-06|  34-567-89012|
| 11|     Felix|      Lee|          CA|   1982-04-17|  321-654-0987|
| 12|      Adam|    Brown|          CA|   1995-09-07|  432-765-1098|
| 13|     Lucas|    Jones|          CA|   1999-02-06|  543-876-2109|
| 14|    Yvonne|   Dupont|          FR|   1982-04-17|01-23-45-67-89|
| 15|    Pascal| Fournier|          FR|   1995-09-07|23-45-67-89-01|
| 16|     Ariel|    Simon|          FR|   1999-02-06|34-56-78-90-12|
+---+----------+---------+------------+-------------+--------------+

Table customers_llap_bis

sqlContext.sql("SELECT * FROM employees.customers_llap_bis").show

Result :

16/12/30 19:41:34 INFO DAGScheduler: Job 5 finished: show at <console>:27, took 0.086317 s
+---+----------+---------+------------+-------------+--------------+
| id|name_first|name_last|addr_country|date_of_birth|     phone_num|
+---+----------+---------+------------+-------------+--------------+
|  1|  Mackenzy|    Smith|          US|   1993-12-18|  123-456-7890|
|  2|   Sherlyn|   Miller|          US|   1975-03-22|  234-567-8901|
|  3|    Khiana|   Wilson|          US|   1989-08-14|  345-678-9012|
|  4|      Jack| Thompson|          US|   1962-10-28|  456-789-0123|
|  5|    Audrey|   Taylor|          UK|   1985-01-11|  12-3456-7890|
|  6|    Ruford|   Walker|          UK|   1976-05-19|  23-4567-8901|
|  7|     Marta|    Lloyd|          UK|   1981-07-23|  34-5678-9012|
|  8|    Derick|Schneider|          DE|   1982-04-17|  12-345-67890|
|  9|      Anna|  Richter|          DE|   1995-09-07|  23-456-78901|
| 10|     Raina|     Graf|          DE|   1999-02-06|  34-567-89012|
| 11|     Felix|      Lee|          CA|   1982-04-17|  321-654-0987|
| 12|      Adam|    Brown|          CA|   1995-09-07|  432-765-1098|
| 13|     Lucas|    Jones|          CA|   1999-02-06|  543-876-2109|
| 14|    Yvonne|   Dupont|          FR|   1982-04-17|01-23-45-67-89|
| 15|    Pascal| Fournier|          FR|   1995-09-07|23-45-67-89-01|
| 16|     Ariel|    Simon|          FR|   1999-02-06|34-56-78-90-12|
+---+----------+---------+------------+-------------+--------------+

But it hangs when trying to read back the table created via the saveAsTable SPARK dataframe method when using an Llap Context

//Create an LlapContext
import org.apache.spark.sql.hive.llap.LlapContext
var llapContext = new LlapContext(sc)

//Read back the table created using saveAsTable ("customers_llap”)
llapContext.sql("SELECT * FROM employees.customers_llap").show(5)

Result :

scala> llapContext.sql("SELECT * FROM employees.customers_llap").show(5)
16/12/30 19:41:54 INFO ParseDriver: Parsing command: SELECT * FROM employees.customers_llap
16/12/30 19:41:54 INFO ParseDriver: Parse Completed
java.lang.Exception: Expected MetastoreRelation
	at org.apache.spark.sql.hive.llap.LlapCatalog.lookupRelation(LlapContext.scala:116)
	at org.apache.spark.sql.hive.llap.LlapContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(LlapContext.scala:47)
	at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)
	at org.apache.spark.sql.hive.llap.LlapContext$$anon$1.lookupRelation(LlapContext.scala:47)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:302)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:314)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:309)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:309)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
	at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
	at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
	at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
	at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
	at $iwC$$iwC$$iwC.<init>(<console>:44)
	at $iwC$$iwC.<init>(<console>:46)
	at $iwC.<init>(<console>:48)
	at <init>(<console>:50)
	at .<init>(<console>:54)
	at .<clinit>(<console>)
	at .<init>(<console>:7)
	at .<clinit>(<console>)
	at $print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
	at org.apache.spark.repl.Main$.main(Main.scala:31)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Reading back the table created using HiveQL (“customers_llap_bis”) works fine…

//Read back the table created using saveAsTable ( “customers_llap”)
llapContext.sql("SELECT * FROM employees.customers_llap_bis").show(5)

Result :

16/12/30 19:42:53 INFO DAGScheduler: Job 6 finished: show at <console>:31, took 2.519766 s
+---+----------+---------+------------+-------------+------------+
| id|name_first|name_last|addr_country|date_of_birth|   phone_num|
+---+----------+---------+------------+-------------+------------+
|  1|  Mackenzy|    Smith|          US|   1993-12-18|123-456-7890|
|  2|   Sherlyn|   Miller|          US|   1975-03-22|234-567-8901|
|  3|    Khiana|   Wilson|          US|   1989-08-14|345-678-9012|
|  4|      Jack| Thompson|          US|   1962-10-28|456-789-0123|
|  5|    Audrey|   Taylor|          UK|   1985-01-11|12-3456-7890|
+---+----------+---------+------------+-------------+------------+
only showing top 5 rows

Reading the table created using the saveAsTable SPARK dataframe method using Hive work fine too

beeline -u jdbc:hive2://{LLAP JDBC URL}:10500/ -n spark -e "SELECT * FROM employees.customers_llap limit 5"

Result :

Connected to: Apache Hive (version 2.1.0.2.5.3.0-37)
Driver: Hive JDBC (version 1.2.1000.2.5.3.0-37)
Transaction isolation: TRANSACTION_REPEATABLE_READ
INFO  : Compiling command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096): SELECT * FROM employees.customers_llap limit 5
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:customers_llap.id, type:string, comment:null), FieldSchema(name:customers_llap.name_first, type:string, comment:null), FieldSchema(name:customers_llap.name_last, type:string, comment:null), FieldSchema(name:customers_llap.addr_country, type:string, comment:null), FieldSchema(name:customers_llap.date_of_birth, type:string, comment:null), FieldSchema(name:customers_llap.phone_num, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096); Time taken: 0.198 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096): SELECT * FROM employees.customers_llap limit 5
INFO  : Completed executing command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096); Time taken: 0.003 seconds
INFO  : OK
+--------------------+----------------------------+---------------------------+------------------------------+-------------------------------+---------------------------+--+
| customers_llap.id  | customers_llap.name_first  | customers_llap.name_last  | customers_llap.addr_country  | customers_llap.date_of_birth  | customers_llap.phone_num  |
+--------------------+----------------------------+---------------------------+------------------------------+-------------------------------+---------------------------+--+
| 1                  | Mackenzy                   | Smith                     | US                           | 1993-12-18                    | 123-456-7890              |
| 2                  | Sherlyn                    | Miller                    | US                           | 1975-03-22                    | 234-567-8901              |
| 3                  | Khiana                     | Wilson                    | US                           | 1989-08-14                    | 345-678-9012              |
| 4                  | Jack                       | Thompson                  | US                           | 1962-10-28                    | 456-789-0123              |
| 5                  | Audrey                     | Taylor                    | UK                           | 1985-01-11                    | 12-3456-7890              |
+--------------------+----------------------------+---------------------------+------------------------------+-------------------------------+---------------------------+--+
5 rows selected (0.332 seconds)
1 REPLY 1
Highlighted

Re: Spark LLAP Connector : Does not read tables created via the saveAsTable SPARK dataframe method

They are still working out some of the kinks, so your problem appears to be mixing the HiveContext with the LLAPContext. If you think about how long it took to get the SQLContext and HiveContext to be the same, it doesn't surprise me that there would be issues when mixing the LLAPContext with the HiveContext.

Try creating your table in Spark using the LLAPContext instead of the HiveContext (by default that is what is happening). You also could try to not save specifically as ORC and let it try the default Parquet, but I doubt that would help. Although I have seen stranger things.

LLAPContext.sql(CREATE table...).

Being as new as it is, I would try to keep it as happy path as possible.

Don't have an account?
Coming from Hortonworks? Activate your account here