Created 06-08-2022 12:42 AM
First we created a table based on JSON dataset from hive cli using the below query
CREATE EXTERNAL TABLE json10( fruit string, size string, color string ) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3a://json2/' ;
we are able to execute select queries form hive cli on top of the table created above, but unable to execute the same from pyspark script.
pyspark script
from pyspark.context import SparkContext sc=SparkContext() from pyspark.sql import HiveContext hive_context = HiveContext(sc) json=hive_context.table("default.json") hive_context.sql("select * from json").show()
ERROR MESSAGE
22/06/07 15:24:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, b7-36.lab.archivas.com, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2436) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
we also tried providing the hive Hcatalog Jars in the pyspark script and landed in below error message
python script:
from pyspark.context import SparkContext sc=SparkContext() from pyspark.sql import HiveContext hive_context = HiveContext(sc) json=hive_context.table("default.json") hive_context.sql("ADD JAR /opt/cloudera/parcels/CDH/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar") hive_context.sql("select * from json").show()
ERROR MESSAGE:
22/06/07 15:15:30 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on b7-38.lab.archivas.com:46455 in memory (size: 6.0 KB, free: 366.3 MB) 22/06/07 15:15:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, b7-38.lab.archivas.com, executor 1): org.apache.hadoop.hive.serde2.SerDeException: java.io.IOException: Start token not found where expected at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:182) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:487) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:486) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.IOException: Start token not found where expected at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:170) ... 25 more
can anyone suggest additional parameters or configuration to be set to make Json tables (created in Hive ) to work from pyspark script. Also please note that CSV & parquet dataset are working fine
Created 08-31-2022 09:45 PM
Hi @mmk
By default, Hive will load all SerDe under the hive/lib location. So you are able to do the create/insert/select operations.
In order to read the Hive table created with Custom or external SerDe we need to provide to spark, so spark internally use those libraries and it will load the Hive table data. If you are not provided the serde you can see the following exception:
org.apache.hadoop.hive.serde2.SerDeException
Please add the following library to the spark-submit command:
json-serde-<version>.jar