Created 06-08-2022 12:42 AM
First we created a table based on JSON dataset from hive cli using the below query
CREATE EXTERNAL TABLE json10( fruit string, size string, color string ) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3a://json2/' ;
we are able to execute select queries form hive cli on top of the table created above, but unable to execute the same from pyspark script.
pyspark script
from pyspark.context import SparkContext sc=SparkContext() from pyspark.sql import HiveContext hive_context = HiveContext(sc) json=hive_context.table("default.json") hive_context.sql("select * from json").show()
ERROR MESSAGE
22/06/07 15:24:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, b7-36.lab.archivas.com, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2436) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2321) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
we also tried providing the hive Hcatalog Jars in the pyspark script and landed in below error message
python script:
from pyspark.context import SparkContext sc=SparkContext() from pyspark.sql import HiveContext hive_context = HiveContext(sc) json=hive_context.table("default.json") hive_context.sql("ADD JAR /opt/cloudera/parcels/CDH/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar") hive_context.sql("select * from json").show()
ERROR MESSAGE:
22/06/07 15:15:30 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on b7-38.lab.archivas.com:46455 in memory (size: 6.0 KB, free: 366.3 MB) 22/06/07 15:15:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, b7-38.lab.archivas.com, executor 1): org.apache.hadoop.hive.serde2.SerDeException: java.io.IOException: Start token not found where expected at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:182) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:487) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:486) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.IOException: Start token not found where expected at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:170) ... 25 more
can anyone suggest additional parameters or configuration to be set to make Json tables (created in Hive ) to work from pyspark script. Also please note that CSV & parquet dataset are working fine