- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
PySpark JSON read with strict schema check and mark the valid and invalid records based on the non-nullable attributes and invalid json itself
Created 02-06-2025 11:13 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
I have a use case where I have to read the JSON files from "/data/json_files/" location with schema enforced.
For the completeness we want to mark the invalid records. The invalid records may be the ones where the mandatory field/s are null, data type mismatch or invalid json itself.
I have tried below but nothing worked as of now. It would be nice if someone has already this use case and a solution for it or may be knowledgeable in this area.
Created on 02-10-2025 08:58 AM - edited 02-10-2025 08:59 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
we can use UDF to solve this problem.
Sharing sample one for a example.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
# ... (schema definition remains the same)
# Define the schema
schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"tasks",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True),
]
)
),
False,
),
]
)
def validate_record(row):
try:
# More efficient way to check against schema
row_dict = row.asDict() # Convert Row to dictionary
Row(**row_dict) #Create Row object from Dictionary. This will validate against schema.
# Additional checks within tasks array (same as before)
for task in row.tasks:
if task.data_col_1 is None or not isinstance(task.data_col_1, int):
return False, "data_col_1 is missing or invalid in a task"
if task.data_col_2 is None or not isinstance(task.data_col_2, str):
return False, "data_col_2 is missing or invalid in a task"
if task.data_col_3 is not None and not isinstance(task.data_col_3, int):
return False, "data_col_3 is invalid in a task"
if task.data_col_4 is not None and not isinstance(task.data_col_4, int):
return False, "data_col_4 is invalid in a task"
return True, None # All checks passed
except Exception as e:
return False, str(e) # Mark as invalid with the exception message
# Read JSON files with PERMISSIVE mode to capture corrupt records
raw_df = (
spark.read.schema(schema)
.option("mode", "PERMISSIVE")
.option("multiline", "true")
.json("/tmp/json_data")
.withColumn("src_filename", input_file_name())
)
# Apply validation using a UDF for better performance
validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType())]))
validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
# Separate valid and invalid records (same as before)
valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message")
invalid_df = validated_df.filter("is_valid == false").drop("is_valid")
# Show the results
valid_df.show(truncate=False)
invalid_df.show(truncate=False)
Attaching sample cluster console output for reference.
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Row
>>>
>>> # Define the schema
... schema = StructType(
... [
... StructField(
... "meta",
... StructType(
... [
... StructField("id", StringType(), False),
... StructField("timestamp", TimestampType(), False),
... StructField("version", IntegerType(), False),
... ]
... ),
... False,
... ),
... StructField(
... "tasks",
... ArrayType(
... StructType(
... [
... StructField("data_col_1", IntegerType(), False),
... StructField("data_col_2", StringType(), False),
... StructField("data_col_3", IntegerType(), True),
... StructField("data_col_4", IntegerType(), True),
... ]
... )
... ),
... False,
... ),
... ]
... )
>>>
>>>
>>> def validate_record(row):
... try:
... # Check meta (top level)
... meta_dict = row.meta.asDict() if row.meta else {} # Handle potential null meta
... Row(**meta_dict) #Validate meta against schema.
... validated_tasks = []
... for task in row.tasks:
... try:
... task_dict = task.asDict()
... Row(**task_dict) #Validate each task against the schema.
... validated_tasks.append(task) #If valid add it to validated task list
... except:
... validated_tasks.append(Row(data_col_1=None, data_col_2=None, data_col_3=None, data_col_4=None, _corrupt_record="Invalid Task")) # Append a "corrupted" task with nulls.
... return True, None, validated_tasks # Return validated tasks
... except Exception as e:
... return False, str(e), [] # Mark as invalid with the exception message and empty tasks
...
>>>
>>>
>>> raw_df = (
... spark.read.schema(schema)
... .option("mode", "PERMISSIVE")
... .option("multiline", "true")
... .json("/tmp/json_data")
... .withColumn("src_filename", input_file_name())
... )
>>> validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType()), StructField("validated_tasks", ArrayType(schema.fields[1].dataType.elementType))]))
>>>
>>> validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
>>> valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message").withColumn("tasks", col("validated_tasks")).drop("validated_tasks")
>>> invalid_df = validated_df.filter("is_valid == false").drop("is_valid", "validated_tasks")
>>> valid_df.show(truncate=False)
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|meta |tasks |src_filename |
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|[efgh5678, 2025-02-07 07:59:12.123, 1]|[[0, Required, 9, 7], [22, Required, 10, 11]]|hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_2.json|
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
>>> invalid_df.show(truncate=False)
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|meta|tasks|src_filename |error_message |
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|null|null |hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_1.json|'NoneType' object is not iterable|
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
>>>
Depending upon the use-case , feel free to edit the UDF.
Created on 02-10-2025 08:58 AM - edited 02-10-2025 08:59 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
we can use UDF to solve this problem.
Sharing sample one for a example.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
# ... (schema definition remains the same)
# Define the schema
schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"tasks",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True),
]
)
),
False,
),
]
)
def validate_record(row):
try:
# More efficient way to check against schema
row_dict = row.asDict() # Convert Row to dictionary
Row(**row_dict) #Create Row object from Dictionary. This will validate against schema.
# Additional checks within tasks array (same as before)
for task in row.tasks:
if task.data_col_1 is None or not isinstance(task.data_col_1, int):
return False, "data_col_1 is missing or invalid in a task"
if task.data_col_2 is None or not isinstance(task.data_col_2, str):
return False, "data_col_2 is missing or invalid in a task"
if task.data_col_3 is not None and not isinstance(task.data_col_3, int):
return False, "data_col_3 is invalid in a task"
if task.data_col_4 is not None and not isinstance(task.data_col_4, int):
return False, "data_col_4 is invalid in a task"
return True, None # All checks passed
except Exception as e:
return False, str(e) # Mark as invalid with the exception message
# Read JSON files with PERMISSIVE mode to capture corrupt records
raw_df = (
spark.read.schema(schema)
.option("mode", "PERMISSIVE")
.option("multiline", "true")
.json("/tmp/json_data")
.withColumn("src_filename", input_file_name())
)
# Apply validation using a UDF for better performance
validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType())]))
validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
# Separate valid and invalid records (same as before)
valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message")
invalid_df = validated_df.filter("is_valid == false").drop("is_valid")
# Show the results
valid_df.show(truncate=False)
invalid_df.show(truncate=False)
Attaching sample cluster console output for reference.
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Row
>>>
>>> # Define the schema
... schema = StructType(
... [
... StructField(
... "meta",
... StructType(
... [
... StructField("id", StringType(), False),
... StructField("timestamp", TimestampType(), False),
... StructField("version", IntegerType(), False),
... ]
... ),
... False,
... ),
... StructField(
... "tasks",
... ArrayType(
... StructType(
... [
... StructField("data_col_1", IntegerType(), False),
... StructField("data_col_2", StringType(), False),
... StructField("data_col_3", IntegerType(), True),
... StructField("data_col_4", IntegerType(), True),
... ]
... )
... ),
... False,
... ),
... ]
... )
>>>
>>>
>>> def validate_record(row):
... try:
... # Check meta (top level)
... meta_dict = row.meta.asDict() if row.meta else {} # Handle potential null meta
... Row(**meta_dict) #Validate meta against schema.
... validated_tasks = []
... for task in row.tasks:
... try:
... task_dict = task.asDict()
... Row(**task_dict) #Validate each task against the schema.
... validated_tasks.append(task) #If valid add it to validated task list
... except:
... validated_tasks.append(Row(data_col_1=None, data_col_2=None, data_col_3=None, data_col_4=None, _corrupt_record="Invalid Task")) # Append a "corrupted" task with nulls.
... return True, None, validated_tasks # Return validated tasks
... except Exception as e:
... return False, str(e), [] # Mark as invalid with the exception message and empty tasks
...
>>>
>>>
>>> raw_df = (
... spark.read.schema(schema)
... .option("mode", "PERMISSIVE")
... .option("multiline", "true")
... .json("/tmp/json_data")
... .withColumn("src_filename", input_file_name())
... )
>>> validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType()), StructField("validated_tasks", ArrayType(schema.fields[1].dataType.elementType))]))
>>>
>>> validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
>>> valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message").withColumn("tasks", col("validated_tasks")).drop("validated_tasks")
>>> invalid_df = validated_df.filter("is_valid == false").drop("is_valid", "validated_tasks")
>>> valid_df.show(truncate=False)
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|meta |tasks |src_filename |
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|[efgh5678, 2025-02-07 07:59:12.123, 1]|[[0, Required, 9, 7], [22, Required, 10, 11]]|hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_2.json|
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
>>> invalid_df.show(truncate=False)
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|meta|tasks|src_filename |error_message |
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|null|null |hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_1.json|'NoneType' object is not iterable|
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
>>>
Depending upon the use-case , feel free to edit the UDF.
Created 02-28-2025 04:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@ggangadharan Thanks for your response and the suggestion.
I will try this out and confirm soon.
However, I expect a structural solution and should be handled during the spark.read.json with the schema.
Created 02-17-2025 10:14 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Sujit_Jain, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future.
Regards,
Vidya Sargur,Community Manager
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
Created 03-04-2025 04:55 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@ggangadharan I have tried this and it is working. Thanks for the help.
