Created 12-11-2015 05:46 AM
Hi,
I am using HDP2.3.2 with Spark 1.4.1 and trying to insert data in hive table using hive context.
Below is the sample code
spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m //Sample code import org.apache.spark.sql.SQLContext import sqlContext.implicits._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val people = sc.textFile("/user/spark/people.txt") val schemaString = "name age" import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType}; val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) //Create hive context val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) //Apply the schema to the val df = hiveContext.createDataFrame(rowRDD, schema); val options = Map("path" -> "hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/personhivetable") df.write.format("org.apache.spark.sql.hive.orc.DefaultSource").options(options).saveAsTable("personhivetable")
Getting below error :
org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:191) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at $line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(<console>:29) at $line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(<console>:29) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:182) ... 8 more
Is it configuration issue?
When I googled it I found out that Environment variable named HIVE_CONF_DIR should be there in spark-env.sh
Then I checked spark-env.sh in HDP2.3.2,I couldnt find the Environment variable named HIVE_CONF_DIR .
Do I need to add above mentioned variables to insert spark output data to hive tables.
Would really appreciate pointers.
Thanks,
Divya
Created 12-15-2015 08:52 PM
ORC is only supported in HiveContext, but here SQLContext is used.
Created 12-11-2015 01:25 PM
If you want the table to be accessible from hive as well, you cannot use saveAsTable. If you use saveAsTable only spark sql will be able to use it.
You have two ways to create orc tables from spark (compatible with hive). I tested codes below with hdp 2.3.2 sandbox and spark 1.4.1
1- Saving orc file from spark and create table directly on hive, see this code:
spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
import org.apache.spark.sql._ import org.apache.spark.sql.types._ val people = sc.textFile("/tmp/people.txt") val schemaString = "name age" val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val df = sqlContext.createDataFrame(rowRDD, schema); sqlContext.sql("drop table if exists personhivetable") sqlContext.sql("create external table personhivetable (name string, age string) stored as orc location '/tmp/personhivetable/'") df.write.format("orc").mode("overwrite").save("/tmp/personhivetable") sqlContext.sql("show tables").collect().foreach(println); sqlContext.sql("select * from personhivetable").collect().foreach(println);
2- Registering your data frame as temporary table and performing a create table as select
spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
import org.apache.spark.sql._ import org.apache.spark.sql.types._ val people = sc.textFile("/tmp/people.txt") val schemaString = "name age" val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("personhivetable_tmp") sqlContext.sql("drop table if exists personhivetable2") sqlContext.sql("CREATE TABLE personhivetable2 STORED AS ORC AS SELECT * from personhivetable_tmp") sqlContext.sql("show tables").collect().foreach(println); sqlContext.sql("select * from personhivetable2").collect().foreach(println);
Also, check this question with more discussion about orc + spark.
https://community.hortonworks.com/questions/4292/how-do-i-create-an-orc-hive-table-from-spark.html
Created 12-13-2015 04:49 PM
Hi Guil,
What is the benefit of running Hive QLs in spark-shell.
Can't I change the hive.execution.engine to spark and do the querying?
thanks
Created 12-13-2015 04:51 PM
Created 12-16-2015 02:48 AM
See this post about Spark vs Tez:
https://community.hortonworks.com/questions/5408/spark-vs-tez.html#comment-6248
Created 12-15-2015 08:52 PM
ORC is only supported in HiveContext, but here SQLContext is used.
Created 12-15-2015 08:53 PM
Hive QL is more functionality rich than SparkSQL. When possible, HiveContext is recommended.