Created 11-18-2024 11:59 PM
I am trying to retrieve data from a table in HDFS in which a column contains timestamps.
I am connected in hdfs using CDSW and running a python script which opens a spark session and run an sql query to retrieve some rows from the table. Although running the same sql query in HUE imala i get the proper values, in CDSW using the python script i get None values only from the timestamp column. How can i retrieve my data properly. It's a huge table so i cannot just export the csv file from the impala editor. I want to retrieve data for migration to another database. The script i run in CDSW is the following:
import pandas as pd
import numpy as np
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import lit
from pyspark.sql.functions import unix_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_utc_timestamp
from pyspark.sql import SQLContext
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
os.environ['PROJ_LIB']='/home/cdsw/.conda/envs/python3.6/share/proj'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
spark = SparkSession.builder\
.master("yarn")\
.config("spark.sql.session.timeZone","UTC")\
.config("spark.submit.deployMode", "client")\
.config("spark.eventLog.enabled", "true")\
.config("spark.executor.instances", "30")\
.config("spark.executor.cores", "2")\
.config("spark.executor.memory", "4g")\
.config("spark.rpc.message.maxSize", "1024")\
.config("spark.executor.memoryOverhead", "800")\
.config("spark.driver.memory", "4g")\
.config("spark.driver.memoryOverhead", "800")\
.config("spark.spark.driver.maxResultSize", "4g")\
.config("spark.executor.dynamicAllocation.initialExecutors", "false")\
.config("spark.executor.dynamicAllocation.minExecutors", "false")\
.config("spark.executor.dynamicAllocation.maxExecutors", "false")\
.config("spark.sql.broadcastTimeout", "1000")\
.config("spark.kryoserializer.buffer.max", "1024m")\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition = true')
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
# Show all columns
pd.set_option("display.max_rows", None, "display.max_columns", None)
qry ="""SELECT parameter_id, measurement_time , value, par_dt FROM aums.eems_archive_data WHERE par_dt = '20240101' LIMIT 10"""
spark_df = spark.sql(qry)
data_df = spark_df.toPandas()
print(data_df.head(1))
where 'measurement_time ' returns None values instead of dates and times
Created 11-19-2024 11:16 PM
@Nikitas, Welcome to our community! To help you get the best possible answer, I have tagged in our experts @ChethanYM @satz who may be able to assist you further.
Please feel free to provide any additional information or details about your query. We hope that you will find a satisfactory solution to your question.
Regards,
Vidya Sargur,Created 11-20-2024 01:29 AM
@Nikitas Thank you for posting your query with us.
Just we would like to check following items
1. Does the files (data format stored in table) is parquet / avro / any other format ?
2. Were you able to remove the following configurations and able to read the timestamps?
config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
3. Does these files are written by Hive / Spark / external applications ?
4. Have you tried setting these above modes to "CORRECTED" (for parquet files generated by latest Hive or other application) ? and does that worked for that column
Created 11-20-2024 01:51 AM
@satz1. I think that the files stored in the table are parquet, but i will check it.
2. I have already run the script without these configurations and get None values. That is why i added these configuration to my spark session in CDSW.
3. The files are fed in the hdfs from an sftp server, using handmade bash scripts that tranform raw data.
4. Changing the "LEGACY" to "CORRECTED" did change nothing. I expect
parameter_id measurement_time value par_dt
0 d7cc8e82-7ad1 2024-01-01 01:34:24 13.45 20240101
1 d7caa072-7ad1 2024-01-01 01:44:50 28.00 20240101
and i get
parameter_id measurement_time value par_dt 0 d7cc8e82-7ad1 None 13.45 20240101 1 d7caa072-7ad1 None 28.00 20240101
Created 11-21-2024 03:53 AM
@satzI just noticed that when i run the same query in hive instead of impala editor, measurement_time columns shows onl Null values. Does that mean that there files are written by Hive? I would really appreciate any further suggestions!