Support Questions

Find answers, ask questions, and share your expertise

org.apache.spark.SparkException: Task failed while writing rows.

avatar
Expert Contributor

Hi,

I am using HDP2.3.2 with Spark 1.4.1 and trying to insert data in hive table using hive context.

Below is the sample code

spark-shell   --master yarn-client --driver-memory 512m --executor-memory 512m
//Sample code 
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("/user/spark/people.txt")
val schemaString = "name age"
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
//Create hive context 
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
//Apply the schema to the 
val df = hiveContext.createDataFrame(rowRDD, schema);
val options = Map("path" ->  "hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/personhivetable")
df.write.format("org.apache.spark.sql.hive.orc.DefaultSource").options(options).saveAsTable("personhivetable")

Getting below error :

org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:191)
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160)
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
	at $line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(<console>:29)
	at $line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(<console>:29)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:182)
	... 8 more

Is it configuration issue?

When I googled it I found out that Environment variable named HIVE_CONF_DIR should be there in spark-env.sh

Then I checked spark-env.sh in HDP2.3.2,I couldnt find the Environment variable named HIVE_CONF_DIR .

Do I need to add above mentioned variables to insert spark output data to hive tables.

Would really appreciate pointers.

Thanks,

Divya

1 ACCEPTED SOLUTION

avatar
New Contributor

ORC is only supported in HiveContext, but here SQLContext is used.

View solution in original post

6 REPLIES 6

avatar

@Divya Gehlot

If you want the table to be accessible from hive as well, you cannot use saveAsTable. If you use saveAsTable only spark sql will be able to use it.

You have two ways to create orc tables from spark (compatible with hive). I tested codes below with hdp 2.3.2 sandbox and spark 1.4.1

1- Saving orc file from spark and create table directly on hive, see this code:

spark-shell   --master yarn-client --driver-memory 512m --executor-memory 512m
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val people = sc.textFile("/tmp/people.txt")
val schemaString = "name age"
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

val df = sqlContext.createDataFrame(rowRDD, schema);

sqlContext.sql("drop table if exists personhivetable")

sqlContext.sql("create external table personhivetable (name string, age string) stored as orc location '/tmp/personhivetable/'")

df.write.format("orc").mode("overwrite").save("/tmp/personhivetable")

sqlContext.sql("show tables").collect().foreach(println);

sqlContext.sql("select * from personhivetable").collect().foreach(println);

2- Registering your data frame as temporary table and performing a create table as select

spark-shell   --master yarn-client --driver-memory 512m --executor-memory 512m
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val people = sc.textFile("/tmp/people.txt")
val schemaString = "name age"
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))


val df = sqlContext.createDataFrame(rowRDD, schema);

df.registerTempTable("personhivetable_tmp")

sqlContext.sql("drop table if exists personhivetable2")

sqlContext.sql("CREATE TABLE personhivetable2 STORED AS ORC AS SELECT * from personhivetable_tmp")

sqlContext.sql("show tables").collect().foreach(println);

sqlContext.sql("select * from personhivetable2").collect().foreach(println);

Also, check this question with more discussion about orc + spark.

https://community.hortonworks.com/questions/4292/how-do-i-create-an-orc-hive-table-from-spark.html

avatar
Contributor

Hi Guil,

What is the benefit of running Hive QLs in spark-shell.

Can't I change the hive.execution.engine to spark and do the querying?

thanks

avatar
Master Mentor

avatar

avatar
New Contributor

ORC is only supported in HiveContext, but here SQLContext is used.

avatar
New Contributor

Hive QL is more functionality rich than SparkSQL. When possible, HiveContext is recommended.