Member since
09-16-2021
420
Posts
54
Kudos Received
38
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
175 | 09-05-2025 07:19 AM | |
598 | 07-15-2025 02:22 AM | |
1172 | 06-02-2025 06:55 AM | |
1455 | 05-22-2025 03:00 AM | |
810 | 05-19-2025 03:02 AM |
04-10-2025
10:44 PM
To provide the exact HQL query, Please share the following : DDL for both the tables Sample records from each table The expected output based on the sample data. The above information will help to understand the problem statement better and validate the solution.
... View more
03-28-2025
03:47 AM
* This has been addressed as part of support case. * Tez job failed with below error. Caused by: org.apache.hive.com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
Serialization trace:
columnTypeResolvers (org.apache.hadoop.hive.ql.exec.UnionOperator)
tableDesc (org.apache.hadoop.hive.ql.plan.PartitionDesc)
aliasToPartnInfo (org.apache.hadoop.hive.ql.plan.MapWork)
at org.apache.hive.com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at org.apache.hive.com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:186)
at org.apache.hive.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at org.apache.hive.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at org.apache.hive.com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:219)
at org.apache.hive.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at org.apache.hive.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$PartitionDescSerializer.read(SerializationUtilities.java:580)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$PartitionDescSerializer.read(SerializationUtilities.java:572)
at org.apache.hive.com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClassAndObject(SerializationUtilities.java:181)
at org.apache.hive.com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
at org.apache.hive.com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at org.apache.hive.com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:219)
at org.apache.hive.com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at org.apache.hive.com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at org.apache.hive.com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:211)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities.deserializeObjectByKryo(SerializationUtilities.java:755)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities.deserializePlan(SerializationUtilities.java:661)
at org.apache.hadoop.hive.ql.exec.SerializationUtilities.deserializePlan(SerializationUtilities.java:638)
at org.apache.hadoop.hive.ql.exec.Utilities.getBaseWork(Utilities.java:492)
... 22 more * Serialization related jars loaded from different version of hive-exec (hive-exec-<version>.jar) * Remove older version of jars from the HS2 classpath and aux jars to overcome the problem.
... View more
03-25-2025
04:12 AM
If the Beeline shell gets stuck, first validate whether HS2 is up and running. Then, check if the query reaches HS2. If the query reaches HS2 but gets stuck, analyze the HS2 JSTACK and HS2 logs to identify the issue. If the query does not reach HS2, validate the Beeline JSTACK, HS2 JSTACK, and HS2 logs. If you are unable to determine the root cause with this information, I recommend raising a support ticket for further investigation.
... View more
03-21-2025
06:29 AM
To identify which user is writing the files, use HDFS CLI commands such as ls or getfacl
... View more
03-06-2025
07:42 AM
Assuming it's a MapReduce job, since you're looking for information related to MapReduce I/O counters. Script to calculate the counter info. [hive@node4 ~]$ cat get_io_counters.sh
#!/bin/bash
# Ensure a job ID is provided
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <job_id>"
exit 1
fi
JOB_ID=$1
# Extract I/O counters from the MapReduce job status
mapred job -status "$JOB_ID" | egrep -A 1 'File Input Format Counters|File Output Format Counters' | awk -F'=' '
/File Input Format Counters/ {getline; bytes_read=$2}
/File Output Format Counters/ {getline; bytes_written=$2}
END {
total_io_mb = (bytes_read + bytes_written) / (1024 * 1024)
printf "BYTES_READ=%d\nBYTES_WRITTEN=%d\nTOTAL_IO_MB=%.2f\n", bytes_read, bytes_written, total_io_mb
}'
[hive@node4 ~]$ Sample Output [hive@node4 ~]$ ./get_io_counters.sh job_1741272271547_0007
25/03/06 15:38:34 INFO client.RMProxy: Connecting to ResourceManager at node3.playground-ggangadharan.coelab.cloudera.com/10.129.117.75:8032
25/03/06 15:38:35 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
BYTES_READ=288894
BYTES_WRITTEN=348894
TOTAL_IO_MB=0.61
[hive@node4 ~]$
... View more
02-10-2025
08:58 AM
we can use UDF to solve this problem. Sharing sample one for a example. from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
# ... (schema definition remains the same)
# Define the schema
schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"tasks",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True),
]
)
),
False,
),
]
)
def validate_record(row):
try:
# More efficient way to check against schema
row_dict = row.asDict() # Convert Row to dictionary
Row(**row_dict) #Create Row object from Dictionary. This will validate against schema.
# Additional checks within tasks array (same as before)
for task in row.tasks:
if task.data_col_1 is None or not isinstance(task.data_col_1, int):
return False, "data_col_1 is missing or invalid in a task"
if task.data_col_2 is None or not isinstance(task.data_col_2, str):
return False, "data_col_2 is missing or invalid in a task"
if task.data_col_3 is not None and not isinstance(task.data_col_3, int):
return False, "data_col_3 is invalid in a task"
if task.data_col_4 is not None and not isinstance(task.data_col_4, int):
return False, "data_col_4 is invalid in a task"
return True, None # All checks passed
except Exception as e:
return False, str(e) # Mark as invalid with the exception message
# Read JSON files with PERMISSIVE mode to capture corrupt records
raw_df = (
spark.read.schema(schema)
.option("mode", "PERMISSIVE")
.option("multiline", "true")
.json("/tmp/json_data")
.withColumn("src_filename", input_file_name())
)
# Apply validation using a UDF for better performance
validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType())]))
validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
# Separate valid and invalid records (same as before)
valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message")
invalid_df = validated_df.filter("is_valid == false").drop("is_valid")
# Show the results
valid_df.show(truncate=False)
invalid_df.show(truncate=False) Attaching sample cluster console output for reference. >>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Row
>>>
>>> # Define the schema
... schema = StructType(
... [
... StructField(
... "meta",
... StructType(
... [
... StructField("id", StringType(), False),
... StructField("timestamp", TimestampType(), False),
... StructField("version", IntegerType(), False),
... ]
... ),
... False,
... ),
... StructField(
... "tasks",
... ArrayType(
... StructType(
... [
... StructField("data_col_1", IntegerType(), False),
... StructField("data_col_2", StringType(), False),
... StructField("data_col_3", IntegerType(), True),
... StructField("data_col_4", IntegerType(), True),
... ]
... )
... ),
... False,
... ),
... ]
... )
>>>
>>>
>>> def validate_record(row):
... try:
... # Check meta (top level)
... meta_dict = row.meta.asDict() if row.meta else {} # Handle potential null meta
... Row(**meta_dict) #Validate meta against schema.
... validated_tasks = []
... for task in row.tasks:
... try:
... task_dict = task.asDict()
... Row(**task_dict) #Validate each task against the schema.
... validated_tasks.append(task) #If valid add it to validated task list
... except:
... validated_tasks.append(Row(data_col_1=None, data_col_2=None, data_col_3=None, data_col_4=None, _corrupt_record="Invalid Task")) # Append a "corrupted" task with nulls.
... return True, None, validated_tasks # Return validated tasks
... except Exception as e:
... return False, str(e), [] # Mark as invalid with the exception message and empty tasks
...
>>>
>>>
>>> raw_df = (
... spark.read.schema(schema)
... .option("mode", "PERMISSIVE")
... .option("multiline", "true")
... .json("/tmp/json_data")
... .withColumn("src_filename", input_file_name())
... )
>>> validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType()), StructField("validated_tasks", ArrayType(schema.fields[1].dataType.elementType))]))
>>>
>>> validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
>>> valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message").withColumn("tasks", col("validated_tasks")).drop("validated_tasks")
>>> invalid_df = validated_df.filter("is_valid == false").drop("is_valid", "validated_tasks")
>>> valid_df.show(truncate=False)
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|meta |tasks |src_filename |
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|[efgh5678, 2025-02-07 07:59:12.123, 1]|[[0, Required, 9, 7], [22, Required, 10, 11]]|hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_2.json|
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
>>> invalid_df.show(truncate=False)
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|meta|tasks|src_filename |error_message |
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|null|null |hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_1.json|'NoneType' object is not iterable|
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
>>> Depending upon the use-case , feel free to edit the UDF.
... View more
02-05-2025
12:48 AM
If the environment allows , use SSSD with LDAP integration to avoid manually creating Users. If that's not possible , use Ansible to automate user creation across all nodes.
... View more
02-04-2025
06:35 AM
Check beeline console output and HS2 logs to identify where it gets stuck and act accordingly.
... View more
02-04-2025
06:33 AM
Use CAST to convert to TIMESTAMP type. SELECT CAST('2024-11-05 10:03:17.872195' AS TIMESTAMP) AS timestamp_value; We can also try TIMESTAMP WITH LOCAL TIME ZONE, This helps retain precision when dealing with timezones. SELECT CAST('2024-11-05 10:03:17.872195' AS TIMESTAMP WITH LOCAL TIME ZONE);
... View more
02-04-2025
05:55 AM
It appears that the user 'xxxx' has not been synchronized back from LDAP to the local OS on the relevant host. There is a possibility that it could be due to misconfiguration on the AD/LDAP side, preventing correct username resolution and causing the synchronization to fail. Resolve AD/LDAP side problem to overcome this problem. Also Document for CDP 7.1.7
... View more