- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Retrieving Timestamps
Created 11-18-2024 11:59 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am trying to retrieve data from a table in HDFS in which a column contains timestamps.
I am connected in hdfs using CDSW and running a python script which opens a spark session and run an sql query to retrieve some rows from the table. Although running the same sql query in HUE imala i get the proper values, in CDSW using the python script i get None values only from the timestamp column. How can i retrieve my data properly. It's a huge table so i cannot just export the csv file from the impala editor. I want to retrieve data for migration to another database. The script i run in CDSW is the following:
import pandas as pd
import numpy as np
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import lit
from pyspark.sql.functions import unix_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_utc_timestamp
from pyspark.sql import SQLContext
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
os.environ['PROJ_LIB']='/home/cdsw/.conda/envs/python3.6/share/proj'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
spark = SparkSession.builder\
.master("yarn")\
.config("spark.sql.session.timeZone","UTC")\
.config("spark.submit.deployMode", "client")\
.config("spark.eventLog.enabled", "true")\
.config("spark.executor.instances", "30")\
.config("spark.executor.cores", "2")\
.config("spark.executor.memory", "4g")\
.config("spark.rpc.message.maxSize", "1024")\
.config("spark.executor.memoryOverhead", "800")\
.config("spark.driver.memory", "4g")\
.config("spark.driver.memoryOverhead", "800")\
.config("spark.spark.driver.maxResultSize", "4g")\
.config("spark.executor.dynamicAllocation.initialExecutors", "false")\
.config("spark.executor.dynamicAllocation.minExecutors", "false")\
.config("spark.executor.dynamicAllocation.maxExecutors", "false")\
.config("spark.sql.broadcastTimeout", "1000")\
.config("spark.kryoserializer.buffer.max", "1024m")\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition = true')
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
# Show all columns
pd.set_option("display.max_rows", None, "display.max_columns", None)
qry ="""SELECT parameter_id, measurement_time , value, par_dt FROM aums.eems_archive_data WHERE par_dt = '20240101' LIMIT 10"""
spark_df = spark.sql(qry)
data_df = spark_df.toPandas()
print(data_df.head(1))
where 'measurement_time ' returns None values instead of dates and times
Created 11-19-2024 11:16 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Nikitas, Welcome to our community! To help you get the best possible answer, I have tagged in our experts @ChethanYM @satz who may be able to assist you further.
Please feel free to provide any additional information or details about your query. We hope that you will find a satisfactory solution to your question.
Regards,
Vidya Sargur,Community Manager
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
Created 11-20-2024 01:29 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Nikitas Thank you for posting your query with us.
Just we would like to check following items
1. Does the files (data format stored in table) is parquet / avro / any other format ?
2. Were you able to remove the following configurations and able to read the timestamps?
config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
3. Does these files are written by Hive / Spark / external applications ?
4. Have you tried setting these above modes to "CORRECTED" (for parquet files generated by latest Hive or other application) ? and does that worked for that column
Satz
Created 11-20-2024 01:51 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@satz1. I think that the files stored in the table are parquet, but i will check it.
2. I have already run the script without these configurations and get None values. That is why i added these configuration to my spark session in CDSW.
3. The files are fed in the hdfs from an sftp server, using handmade bash scripts that tranform raw data.
4. Changing the "LEGACY" to "CORRECTED" did change nothing. I expect
parameter_id measurement_time value par_dt
0 d7cc8e82-7ad1 2024-01-01 01:34:24 13.45 20240101
1 d7caa072-7ad1 2024-01-01 01:44:50 28.00 20240101
and i get
parameter_id measurement_time value par_dt 0 d7cc8e82-7ad1 None 13.45 20240101 1 d7caa072-7ad1 None 28.00 20240101
Created 11-21-2024 03:53 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@satzI just noticed that when i run the same query in hive instead of impala editor, measurement_time columns shows onl Null values. Does that mean that there files are written by Hive? I would really appreciate any further suggestions!
Created 03-21-2025 06:29 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
To identify which user is writing the files, use HDFS CLI commands such as ls or getfacl
