Member since
09-16-2021
423
Posts
55
Kudos Received
39
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1298 | 10-22-2025 05:48 AM | |
| 1378 | 09-05-2025 07:19 AM | |
| 2337 | 07-15-2025 02:22 AM | |
| 3196 | 05-22-2025 03:00 AM | |
| 2003 | 05-19-2025 03:02 AM |
03-25-2025
04:12 AM
If the Beeline shell gets stuck, first validate whether HS2 is up and running. Then, check if the query reaches HS2. If the query reaches HS2 but gets stuck, analyze the HS2 JSTACK and HS2 logs to identify the issue. If the query does not reach HS2, validate the Beeline JSTACK, HS2 JSTACK, and HS2 logs. If you are unable to determine the root cause with this information, I recommend raising a support ticket for further investigation.
... View more
03-21-2025
06:29 AM
To identify which user is writing the files, use HDFS CLI commands such as ls or getfacl
... View more
03-06-2025
07:42 AM
Assuming it's a MapReduce job, since you're looking for information related to MapReduce I/O counters. Script to calculate the counter info. [hive@node4 ~]$ cat get_io_counters.sh
#!/bin/bash
# Ensure a job ID is provided
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <job_id>"
exit 1
fi
JOB_ID=$1
# Extract I/O counters from the MapReduce job status
mapred job -status "$JOB_ID" | egrep -A 1 'File Input Format Counters|File Output Format Counters' | awk -F'=' '
/File Input Format Counters/ {getline; bytes_read=$2}
/File Output Format Counters/ {getline; bytes_written=$2}
END {
total_io_mb = (bytes_read + bytes_written) / (1024 * 1024)
printf "BYTES_READ=%d\nBYTES_WRITTEN=%d\nTOTAL_IO_MB=%.2f\n", bytes_read, bytes_written, total_io_mb
}'
[hive@node4 ~]$ Sample Output [hive@node4 ~]$ ./get_io_counters.sh job_1741272271547_0007
25/03/06 15:38:34 INFO client.RMProxy: Connecting to ResourceManager at node3.playground-ggangadharan.coelab.cloudera.com/10.129.117.75:8032
25/03/06 15:38:35 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
BYTES_READ=288894
BYTES_WRITTEN=348894
TOTAL_IO_MB=0.61
[hive@node4 ~]$
... View more
02-10-2025
08:58 AM
we can use UDF to solve this problem. Sharing sample one for a example. from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
# ... (schema definition remains the same)
# Define the schema
schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"tasks",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True),
]
)
),
False,
),
]
)
def validate_record(row):
try:
# More efficient way to check against schema
row_dict = row.asDict() # Convert Row to dictionary
Row(**row_dict) #Create Row object from Dictionary. This will validate against schema.
# Additional checks within tasks array (same as before)
for task in row.tasks:
if task.data_col_1 is None or not isinstance(task.data_col_1, int):
return False, "data_col_1 is missing or invalid in a task"
if task.data_col_2 is None or not isinstance(task.data_col_2, str):
return False, "data_col_2 is missing or invalid in a task"
if task.data_col_3 is not None and not isinstance(task.data_col_3, int):
return False, "data_col_3 is invalid in a task"
if task.data_col_4 is not None and not isinstance(task.data_col_4, int):
return False, "data_col_4 is invalid in a task"
return True, None # All checks passed
except Exception as e:
return False, str(e) # Mark as invalid with the exception message
# Read JSON files with PERMISSIVE mode to capture corrupt records
raw_df = (
spark.read.schema(schema)
.option("mode", "PERMISSIVE")
.option("multiline", "true")
.json("/tmp/json_data")
.withColumn("src_filename", input_file_name())
)
# Apply validation using a UDF for better performance
validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType())]))
validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
# Separate valid and invalid records (same as before)
valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message")
invalid_df = validated_df.filter("is_valid == false").drop("is_valid")
# Show the results
valid_df.show(truncate=False)
invalid_df.show(truncate=False) Attaching sample cluster console output for reference. >>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Row
>>>
>>> # Define the schema
... schema = StructType(
... [
... StructField(
... "meta",
... StructType(
... [
... StructField("id", StringType(), False),
... StructField("timestamp", TimestampType(), False),
... StructField("version", IntegerType(), False),
... ]
... ),
... False,
... ),
... StructField(
... "tasks",
... ArrayType(
... StructType(
... [
... StructField("data_col_1", IntegerType(), False),
... StructField("data_col_2", StringType(), False),
... StructField("data_col_3", IntegerType(), True),
... StructField("data_col_4", IntegerType(), True),
... ]
... )
... ),
... False,
... ),
... ]
... )
>>>
>>>
>>> def validate_record(row):
... try:
... # Check meta (top level)
... meta_dict = row.meta.asDict() if row.meta else {} # Handle potential null meta
... Row(**meta_dict) #Validate meta against schema.
... validated_tasks = []
... for task in row.tasks:
... try:
... task_dict = task.asDict()
... Row(**task_dict) #Validate each task against the schema.
... validated_tasks.append(task) #If valid add it to validated task list
... except:
... validated_tasks.append(Row(data_col_1=None, data_col_2=None, data_col_3=None, data_col_4=None, _corrupt_record="Invalid Task")) # Append a "corrupted" task with nulls.
... return True, None, validated_tasks # Return validated tasks
... except Exception as e:
... return False, str(e), [] # Mark as invalid with the exception message and empty tasks
...
>>>
>>>
>>> raw_df = (
... spark.read.schema(schema)
... .option("mode", "PERMISSIVE")
... .option("multiline", "true")
... .json("/tmp/json_data")
... .withColumn("src_filename", input_file_name())
... )
>>> validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType()), StructField("validated_tasks", ArrayType(schema.fields[1].dataType.elementType))]))
>>>
>>> validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
>>> valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message").withColumn("tasks", col("validated_tasks")).drop("validated_tasks")
>>> invalid_df = validated_df.filter("is_valid == false").drop("is_valid", "validated_tasks")
>>> valid_df.show(truncate=False)
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|meta |tasks |src_filename |
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|[efgh5678, 2025-02-07 07:59:12.123, 1]|[[0, Required, 9, 7], [22, Required, 10, 11]]|hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_2.json|
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
>>> invalid_df.show(truncate=False)
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|meta|tasks|src_filename |error_message |
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|null|null |hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_1.json|'NoneType' object is not iterable|
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
>>> Depending upon the use-case , feel free to edit the UDF.
... View more
02-05-2025
12:48 AM
If the environment allows , use SSSD with LDAP integration to avoid manually creating Users. If that's not possible , use Ansible to automate user creation across all nodes.
... View more
02-04-2025
06:35 AM
Check beeline console output and HS2 logs to identify where it gets stuck and act accordingly.
... View more
02-04-2025
06:33 AM
Use CAST to convert to TIMESTAMP type. SELECT CAST('2024-11-05 10:03:17.872195' AS TIMESTAMP) AS timestamp_value; We can also try TIMESTAMP WITH LOCAL TIME ZONE, This helps retain precision when dealing with timezones. SELECT CAST('2024-11-05 10:03:17.872195' AS TIMESTAMP WITH LOCAL TIME ZONE);
... View more
02-04-2025
05:55 AM
It appears that the user 'xxxx' has not been synchronized back from LDAP to the local OS on the relevant host. There is a possibility that it could be due to misconfiguration on the AD/LDAP side, preventing correct username resolution and causing the synchronization to fail. Resolve AD/LDAP side problem to overcome this problem. Also Document for CDP 7.1.7
... View more
12-26-2024
08:21 PM
1 Kudo
The error message Invalid SessionHandle: SessionHandle commonly occurs in Hive when there is an issue with the session handle being used. A session handle in Hive is a unique identifier for a session created when a user connects to Hive, used to maintain the state and context of the session. One possible scenario for this error is when a table contains a large number of records and the cluster has multiple HS2 instances. If Knox is used to connect to Hive, Knox might connect to one HS2 and run a query. However, due to the large number of records, the query takes longer to process. If the connection times out from Knox's end and reconnects to another HS2, the query might fail with the "Invalid SessionHandle" error. To investigate this scenario, it is recommended to check the HS2 logs and Knox logs. Additionally, to determine why the query is running long, checking the HS2 and appLogs for any yarn job initiated by HS2 can provide further insights.
... View more
11-28-2024
11:47 PM
1 Kudo
First of all , It is not recommended to use the same location for both internal and external tables. Internal tables in Hive are native tables that are fully controlled by Hive itself. External tables, on the other hand, can be accessed by other components such as Spark, Impala, and File system operations,.....etc. Since External tables are used by other components, their corresponding locations need to be relied upon. To read the files and obtain the count, Hive launches a MapReduce job for external tables. It is recommended to use Managed tables if other components are not utilizing the corresponding table.
... View more