Member since
11-18-2024
3
Posts
2
Kudos Received
0
Solutions
11-21-2024
03:53 AM
1 Kudo
@satzI just noticed that when i run the same query in hive instead of impala editor, measurement_time columns shows onl Null values. Does that mean that there files are written by Hive? I would really appreciate any further suggestions!
... View more
11-20-2024
01:51 AM
@satz1. I think that the files stored in the table are parquet, but i will check it. 2. I have already run the script without these configurations and get None values. That is why i added these configuration to my spark session in CDSW. 3. The files are fed in the hdfs from an sftp server, using handmade bash scripts that tranform raw data. 4. Changing the "LEGACY" to "CORRECTED" did change nothing. I expect parameter_id measurement_time value par_dt
0 d7cc8e82-7ad1 2024-01-01 01:34:24 13.45 20240101 1 d7caa072-7ad1 2024-01-01 01:44:50 28.00 20240101 and i get parameter_id measurement_time value par_dt
0 d7cc8e82-7ad1 None 13.45 20240101
1 d7caa072-7ad1 None 28.00 20240101
... View more
11-18-2024
11:59 PM
1 Kudo
I am trying to retrieve data from a table in HDFS in which a column contains timestamps. I am connected in hdfs using CDSW and running a python script which opens a spark session and run an sql query to retrieve some rows from the table. Although running the same sql query in HUE imala i get the proper values, in CDSW using the python script i get None values only from the timestamp column. How can i retrieve my data properly. It's a huge table so i cannot just export the csv file from the impala editor. I want to retrieve data for migration to another database. The script i run in CDSW is the following: import pandas as pd import numpy as np import sys from pyspark import SparkContext from pyspark import SparkConf from pyspark.sql.functions import lit from pyspark.sql.functions import unix_timestamp from pyspark.sql import SparkSession from pyspark.sql.functions import to_utc_timestamp from pyspark.sql import SQLContext os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6' os.environ['PROJ_LIB']='/home/cdsw/.conda/envs/python3.6/share/proj' os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6' spark = SparkSession.builder\ .master("yarn")\ .config("spark.sql.session.timeZone","UTC")\ .config("spark.submit.deployMode", "client")\ .config("spark.eventLog.enabled", "true")\ .config("spark.executor.instances", "30")\ .config("spark.executor.cores", "2")\ .config("spark.executor.memory", "4g")\ .config("spark.rpc.message.maxSize", "1024")\ .config("spark.executor.memoryOverhead", "800")\ .config("spark.driver.memory", "4g")\ .config("spark.driver.memoryOverhead", "800")\ .config("spark.spark.driver.maxResultSize", "4g")\ .config("spark.executor.dynamicAllocation.initialExecutors", "false")\ .config("spark.executor.dynamicAllocation.minExecutors", "false")\ .config("spark.executor.dynamicAllocation.maxExecutors", "false")\ .config("spark.sql.broadcastTimeout", "1000")\ .config("spark.kryoserializer.buffer.max", "1024m")\ .config("spark.sql.execution.arrow.pyspark.enabled", "true")\ .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \ .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \ .config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \ .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition = true') hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition.mode = nonstrict') # Show all columns pd.set_option("display.max_rows", None, "display.max_columns", None) qry ="""SELECT parameter_id, measurement_time , value, par_dt FROM aums.eems_archive_data WHERE par_dt = '20240101' LIMIT 10""" spark_df = spark.sql(qry) data_df = spark_df.toPandas() print(data_df.head(1)) where 'measurement_time ' returns None values instead of dates and times
... View more
Labels: