Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Unable to execute select queries from Pyspark on hive JSON tables

avatar
Explorer

First we created a table based on JSON dataset from hive cli using the below query

CREATE EXTERNAL TABLE json10(
fruit string,
size string,
color string
)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3a://json2/'
;

 

we are able to execute select queries form hive cli on top of the table created above, but unable to execute the same from pyspark script. 

pyspark script

from pyspark.context import SparkContext
sc=SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
json=hive_context.table("default.json")
hive_context.sql("select * from json").show()

ERROR MESSAGE

 

22/06/07 15:24:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, b7-36.lab.archivas.com, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2436)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

 

we also tried providing the hive Hcatalog Jars in the pyspark script and landed in below error message

python script:

from pyspark.context import SparkContext
sc=SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
json=hive_context.table("default.json")
hive_context.sql("ADD JAR /opt/cloudera/parcels/CDH/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar")
hive_context.sql("select * from json").show()

 

ERROR MESSAGE:

 

22/06/07 15:15:30 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on b7-38.lab.archivas.com:46455 in memory (size: 6.0 KB, free: 366.3 MB)
22/06/07 15:15:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, b7-38.lab.archivas.com, executor 1): org.apache.hadoop.hive.serde2.SerDeException: java.io.IOException: Start token not found where expected
        at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:182)
        at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:487)
        at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:486)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Start token not found where expected
        at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:170)
        ... 25 more

 

can anyone suggest additional parameters or configuration to be set to make Json tables (created in Hive ) to work from pyspark script. Also please note that CSV & parquet dataset are working fine 

1 REPLY 1

avatar
Super Collaborator

Hi @mmk 

 

By default, Hive will load all SerDe under the hive/lib location. So you are able to do the create/insert/select operations. 

 

In order to read the Hive table created with Custom or external SerDe we need to provide to spark, so spark internally use those libraries and it will load the Hive table data. If you are not provided the serde you can see the following exception:

org.apache.hadoop.hive.serde2.SerDeException

Please add the following library to the spark-submit command:

json-serde-<version>.jar