Support Questions
Find answers, ask questions, and share your expertise

Unable to execute select queries from Pyspark on hive JSON tables

Explorer

First we created a table based on JSON dataset from hive cli using the below query

CREATE EXTERNAL TABLE json10(
fruit string,
size string,
color string
)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3a://json2/'
;

 

we are able to execute select queries form hive cli on top of the table created above, but unable to execute the same from pyspark script. 

pyspark script

from pyspark.context import SparkContext
sc=SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
json=hive_context.table("default.json")
hive_context.sql("select * from json").show()

ERROR MESSAGE

 

22/06/07 15:24:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, b7-36.lab.archivas.com, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2436)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

 

we also tried providing the hive Hcatalog Jars in the pyspark script and landed in below error message

python script:

from pyspark.context import SparkContext
sc=SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
json=hive_context.table("default.json")
hive_context.sql("ADD JAR /opt/cloudera/parcels/CDH/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar")
hive_context.sql("select * from json").show()

 

ERROR MESSAGE:

 

22/06/07 15:15:30 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on b7-38.lab.archivas.com:46455 in memory (size: 6.0 KB, free: 366.3 MB)
22/06/07 15:15:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, b7-38.lab.archivas.com, executor 1): org.apache.hadoop.hive.serde2.SerDeException: java.io.IOException: Start token not found where expected
        at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:182)
        at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:487)
        at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:486)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Start token not found where expected
        at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:170)
        ... 25 more

 

can anyone suggest additional parameters or configuration to be set to make Json tables (created in Hive ) to work from pyspark script. Also please note that CSV & parquet dataset are working fine 

0 REPLIES 0
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.