Support Questions

Find answers, ask questions, and share your expertise

Retrieving Timestamps

avatar
New Contributor

I am trying to retrieve data from a table in HDFS in which a column contains timestamps.

I am connected in hdfs using CDSW and  running a python script which opens a spark session and run an sql query to retrieve some rows from the table. Although running the same sql query in HUE imala i get the proper values, in CDSW using the python script i get None values only from the timestamp column. How can i retrieve my data properly. It's a huge table so i cannot just export the csv file from the impala editor. I want to retrieve data for migration to another database. The script i run in CDSW is the following:

import pandas as pd

import numpy as np

import sys

from pyspark import SparkContext

from pyspark import SparkConf

from pyspark.sql.functions import lit
from pyspark.sql.functions import unix_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_utc_timestamp
from pyspark.sql import SQLContext


os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
os.environ['PROJ_LIB']='/home/cdsw/.conda/envs/python3.6/share/proj'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'

spark = SparkSession.builder\
.master("yarn")\
.config("spark.sql.session.timeZone","UTC")\
.config("spark.submit.deployMode", "client")\
.config("spark.eventLog.enabled", "true")\
.config("spark.executor.instances", "30")\
.config("spark.executor.cores", "2")\
.config("spark.executor.memory", "4g")\
.config("spark.rpc.message.maxSize", "1024")\
.config("spark.executor.memoryOverhead", "800")\
.config("spark.driver.memory", "4g")\
.config("spark.driver.memoryOverhead", "800")\
.config("spark.spark.driver.maxResultSize", "4g")\
.config("spark.executor.dynamicAllocation.initialExecutors", "false")\
.config("spark.executor.dynamicAllocation.minExecutors", "false")\
.config("spark.executor.dynamicAllocation.maxExecutors", "false")\
.config("spark.sql.broadcastTimeout", "1000")\
.config("spark.kryoserializer.buffer.max", "1024m")\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()


sc = spark.sparkContext

sqlContext = SQLContext(sc)

hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition = true')
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition.mode = nonstrict')


# Show all columns
pd.set_option("display.max_rows", None, "display.max_columns", None)

qry ="""SELECT parameter_id, measurement_time , value, par_dt FROM aums.eems_archive_data WHERE par_dt = '20240101' LIMIT 10"""

spark_df = spark.sql(qry)
data_df = spark_df.toPandas()
print(data_df.head(1))

where 'measurement_time ' returns None values instead of dates and times

4 REPLIES 4

avatar
Community Manager

@Nikitas, Welcome to our community! To help you get the best possible answer, I have tagged in our experts  @ChethanYM @satz  who may be able to assist you further.

Please feel free to provide any additional information or details about your query. We hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Expert Contributor

@Nikitas Thank you for posting your query with us. 

Just we would like to check following items

1. Does the files (data format stored in table) is parquet / avro / any other format ?

2. Were you able to remove the following configurations and able to read the timestamps?

config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \

3. Does these files are written by Hive / Spark / external applications ?

4. Have you tried setting these above modes to "CORRECTED" (for parquet files generated by latest Hive or other application) ? and does that worked for that column

 

Thanks,
Satz

avatar
New Contributor

@satz1. I think that the files stored in the table are parquet, but i will check it.

2. I have already run the script without these configurations and get None values. That is why i added these configuration to  my spark session in CDSW.

3. The files are fed in the hdfs from an sftp server, using handmade bash scripts that tranform raw data.

4. Changing the "LEGACY" to "CORRECTED" did change nothing. I expect

parameter_id measurement_time  value    par_dt
0 d7cc8e82-7ad1     2024-01-01 01:34:24  13.45 20240101 
1 d7caa072-7ad1   2024-01-01 01:44:50 28.00 20240101

and i get

parameter_id measurement_time  value    par_dt
0  d7cc8e82-7ad1            None  13.45  20240101
1  d7caa072-7ad1            None  28.00  20240101

 

avatar
New Contributor

@satzI just noticed that when i run the same query in hive instead of impala editor, measurement_time columns shows onl Null values. Does that mean that there files are written by Hive? I would really appreciate any further suggestions!