Support Questions

Find answers, ask questions, and share your expertise

ORC Table Timestamp PySpark 2.1 CASTIssue

avatar
Rising Star

All,

I have a table which has 3 columns and is in ORC format, the data is as below

+--------------+-------------+--------------------------+--+
| vehicle_hdr  | vehicle_no  |    incident_timestamp    |
+--------------+-------------+--------------------------+--+
| XXXX         | 3911        | 1969-06-19 06:57:26.485  |
| XXXX         | 3911        | 1988-06-21 05:36:22.35   |

The DDL for the table is as below

create table test (vehicle_hdr string,vehicle_no string,incident_timestamp timestamp)stored as ORC;

From the hive beeline I am able to view the results but when I am using PySpark 2.1 and running the below code

o1 = sqlContext.sql("select vehicle_hdr, incident_timestamp  from test")

I am getting the below error

Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
        at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
        at org.apache.spark.sql.hive.HadoopTableReader$anonfun$14$anonfun$apply$11.apply(TableReader.scala:393)
        at org.apache.spark.sql.hive.HadoopTableReader$anonfun$14$anonfun$apply$11.apply(TableReader.scala:392)
        at org.apache.spark.sql.hive.HadoopTableReader$anonfun$fillObject$2.apply(TableReader.scala:416)
        at org.apache.spark.sql.hive.HadoopTableReader$anonfun$fillObject$2.apply(TableReader.scala:408)
        at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
1 ACCEPTED SOLUTION

avatar

Hi @Jayadeep Jayaraman

I have just tested the same in pyspark2.1. That works fine my site. See below:

beeline

0: jdbc:hive2://dkhdp262.openstacklocal:2181,> create table test_orc (b string,t timestamp) stored as ORC;
0: jdbc:hive2://dkhdp262.openstacklocal:2181,> select * from test_orc;
+-------------+------------------------+--+
| test_orc.b  |       test_orc.t       |
+-------------+------------------------+--+
| a           | 2017-06-13 05:02:23.0  |
| b           | 2017-06-13 05:02:23.0  |
| c           | 2017-06-13 05:02:23.0  |
| d           | 2017-06-13 05:02:23.0  |
| e           | 2017-06-13 05:02:23.0  |
| f           | 2017-06-13 05:02:23.0  |
| g           | 2017-06-13 05:02:23.0  |
| h           | 2017-06-13 05:02:23.0  |
| i           | 2017-06-13 05:02:23.0  |
| j           | 2017-06-13 05:02:23.0  |
+-------------+------------------------+--+
10 rows selected (0.091 seconds)

pyspark

[root@dkhdp262 ~]# export SPARK_MAJOR_VERSION=2
[root@dkhdp262 ~]# pyspark
SPARK_MAJOR_VERSION is set to 2, using Spark2
Python 2.7.5 (default, Jun 17 2014, 18:11:42)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1.2.6.1.0-129
      /_/


Using Python version 2.7.5 (default, Jun 17 2014 18:11:42)
SparkSession available as 'spark'.
>>> sqlContext.sql("select b, t from test_orc").show()
+---+--------------------+
|  b|                   t|
+---+--------------------+
|  a|2017-06-13 05:02:...|
|  b|2017-06-13 05:02:...|
|  c|2017-06-13 05:02:...|
|  d|2017-06-13 05:02:...|
|  e|2017-06-13 05:02:...|
|  f|2017-06-13 05:02:...|
|  g|2017-06-13 05:02:...|
|  h|2017-06-13 05:02:...|
|  i|2017-06-13 05:02:...|
|  j|2017-06-13 05:02:...|
+---+--------------------+

Based on the error you have - is the timestamp value in your table a REAL timestamp? How did you insert it?

View solution in original post

8 REPLIES 8

avatar

Hi @Jayadeep Jayaraman

I have just tested the same in pyspark2.1. That works fine my site. See below:

beeline

0: jdbc:hive2://dkhdp262.openstacklocal:2181,> create table test_orc (b string,t timestamp) stored as ORC;
0: jdbc:hive2://dkhdp262.openstacklocal:2181,> select * from test_orc;
+-------------+------------------------+--+
| test_orc.b  |       test_orc.t       |
+-------------+------------------------+--+
| a           | 2017-06-13 05:02:23.0  |
| b           | 2017-06-13 05:02:23.0  |
| c           | 2017-06-13 05:02:23.0  |
| d           | 2017-06-13 05:02:23.0  |
| e           | 2017-06-13 05:02:23.0  |
| f           | 2017-06-13 05:02:23.0  |
| g           | 2017-06-13 05:02:23.0  |
| h           | 2017-06-13 05:02:23.0  |
| i           | 2017-06-13 05:02:23.0  |
| j           | 2017-06-13 05:02:23.0  |
+-------------+------------------------+--+
10 rows selected (0.091 seconds)

pyspark

[root@dkhdp262 ~]# export SPARK_MAJOR_VERSION=2
[root@dkhdp262 ~]# pyspark
SPARK_MAJOR_VERSION is set to 2, using Spark2
Python 2.7.5 (default, Jun 17 2014, 18:11:42)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1.2.6.1.0-129
      /_/


Using Python version 2.7.5 (default, Jun 17 2014 18:11:42)
SparkSession available as 'spark'.
>>> sqlContext.sql("select b, t from test_orc").show()
+---+--------------------+
|  b|                   t|
+---+--------------------+
|  a|2017-06-13 05:02:...|
|  b|2017-06-13 05:02:...|
|  c|2017-06-13 05:02:...|
|  d|2017-06-13 05:02:...|
|  e|2017-06-13 05:02:...|
|  f|2017-06-13 05:02:...|
|  g|2017-06-13 05:02:...|
|  h|2017-06-13 05:02:...|
|  i|2017-06-13 05:02:...|
|  j|2017-06-13 05:02:...|
+---+--------------------+

Based on the error you have - is the timestamp value in your table a REAL timestamp? How did you insert it?

avatar
Rising Star

Thanks @Daniel, the timestamp in my case are real time stamps that are coming from our sensors. As can be seen the timestamp values are 1969-06-1906:57:26.485 and 1988-06-2105:36:22.35 are in my table.

I inserted the data from a pyspark program, code snippet below

write_df = final_df.where(col(first_partitioned_column).isin(format(first_partition)))
write_df.drop(first_partitioned_column)
write_df.write.mode("overwrite").format("orc").partitionBy(first_partitioned_column).save(path)

One thing I observed was the timestamp column in write_df was of string datatype and not timestamp but then my assumption is that spark will do the cast internally where a dataframe column is string and the table column is of timestamp value.

Another thing to note is from beeline I am able to query the results without any issues.

Thanks in advance.

avatar

Hi @Jayadeep Jayaraman

I have just done another test - treated timestamp as a string. That works for me as well. See below:

beeline

> create table test_orc_t_string (b string,t timestamp) stored as ORC;
> insert into table test_orc_t_string values('a', '1969-06-19 06:57:26.485'),('b','1988-06-21 05:36:22.35');
> select * from test_orc_t_string;
+----------------------+--------------------------+--+
| test_orc_t_string.b  |   test_orc_t_string.t    |
+----------------------+--------------------------+--+
| a                    | 1969-06-19 06:57:26.485  |
| b                    | 1988-06-21 05:36:22.35   |
+----------------------+--------------------------+--+
2 rows selected (0.128 seconds)

pyspark

>>> sqlContext.sql("select * from test_orc_t_string").show()
+---+--------------------+
|  b|                   t|
+---+--------------------+
|  a|1969-06-19 06:57:...|
|  b|1988-06-21 05:36:...|
+---+--------------------+

Can you test the above at your site? Let me know how this works.

Can you also send me the output of the below from beeline:

show create table test;

avatar
Rising Star

Hi @Daniel Kozlowski,

I have tested the above case and it works fine...on my end as well. Also, I created a table with the timestamp column as string and then from this temp table I inserted the data into the main table with timestamp datatype and from spark I am able to read the data without any issues.

I guess the issue is when I am inserting data from spark into hive and reading it back.

avatar
Rising Star

Also see the below the structure of the dataframe before the write method is called

DataFrame[vehicle_hdr: string, vehicle_no: string, incident_timestamp: string]

avatar

@Jayadeep Jayaraman

It is good to hear the sample works.

I have a feeling that problem may be with the way you created your original table.

Hence, try another thing - point your code to the test_orc_t_string table - the one from my above sample. Check if that works.

avatar
Rising Star

Hi @Daniel Kozlowski,

I resolved the issue it was because the input table I had defined had string datatype, I used a cast function inside my spark code and now everything is working fine. Thanks for your help.

avatar

Hi @Jayadeep Jayaraman

That is great - thanks for letting me know