Support Questions

Find answers, ask questions, and share your expertise

PySpark JSON read with strict schema check and mark the valid and invalid records based on the non-nullable attributes and invalid json itself

avatar
Explorer

Hi,

I have a use case where I have to read the JSON files from "/data/json_files/" location with schema enforced.
For the completeness we want to mark the invalid records. The invalid records may be the ones where the mandatory field/s are null, data type mismatch or invalid json itself.

I have tried below but nothing worked as of now. It would be nice if someone has already this use case and a solution for it or may be knowledgeable in this area.

 
Example Schema:
schema = StructType(
    [
        StructField(
            "meta",
            StructType(
                [
                    StructField("id", StringType(), False),
                    StructField("timestamp", TimestampType(), False),
                    StructField("version", IntegerType(), False),
                ]
            ),
            False,
        ),
        StructField(
            "data",
            ArrayType(
                StructType(
                    [
                        StructField("data_col_1", IntegerType(), False),
                        StructField("data_col_2", StringType(), False),
                        StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True)
                    ]
                )
            ),
            False
        )
    ]
)
 
 
JSON file:
 
json_1.json
 
"data_col_4" is having wrong data type.
"data_col_2" is mandatory as per schema but got null.
 
{
    "meta": {
        "id": "abcd1234",
        "timestamp": "2025-02-07T07:59:12.123Z",
        "version": 1,
    },
    "tasks": [
        {
            "data_col_1": 12,
            "data_col_2": "Required",
            "data_col_3": 9,
"data_col_4": 7
        },
{
            "data_col_1": 13,
            "data_col_2": "Required",
            "data_col_3": 10,
"data_col_4": "Wrong data type"
        },
{
            "data_col_1": 14,
            "data_col_2": null,
            "data_col_3": 11,
"data_col_4": 8
        }
    ]
}
 
json_2.json
 
the "data_col_1" is missing in the tasks.
 
{
    "meta": {
        "id": "efgh5678",
        "timestamp": "2025-02-07T07:59:12.123Z",
        "version": 1,
    },
    "tasks": [
        {
            "data_col_2": "Required",
            "data_col_3": 9,
"data_col_4": 7,
        },
{
            "data_col_1": 22,
            "data_col_2": "Required",
            "data_col_3": 10,
"data_col_4": 11
        }
    ]
}
 
 
 
PySpark Code:
 
raw_df = (
            spark.read.schema(estate_schema)
            .option("mode", "PERMISSIVE")
            .option("multiline", "true")
            .json("/data/json_files/")
            .withColumn("src_filename", input_file_name())
        )
 
OR
 
invalid_df = (
               spark.read.schema(estate_schema)
               .option("mode", "PERMISSIVE")
   .option("columnNameOfCorruptRecord", "_corrupt_record")
   .option("multiline", "true")
   .json("/data/json_files/")
   .withColumn("src_filename", input_file_name())
)
 
 
Expected Outcome:
All the valid records of meta and within the tasks array should be processed and invalid (missing mandatory field or incorrect data type or invalid json) should be marked as invalid for that particular records.
 
1 ACCEPTED SOLUTION

avatar
Master Collaborator

we can use UDF to solve this problem. 

Sharing sample one for a example. 

 

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row

# ... (schema definition remains the same)
# Define the schema
schema = StructType(
    [
        StructField(
            "meta",
            StructType(
                [
                    StructField("id", StringType(), False),
                    StructField("timestamp", TimestampType(), False),
                    StructField("version", IntegerType(), False),
                ]
            ),
            False,
        ),
        StructField(
            "tasks",
            ArrayType(
                StructType(
                    [
                        StructField("data_col_1", IntegerType(), False),
                        StructField("data_col_2", StringType(), False),
                        StructField("data_col_3", IntegerType(), True),
                        StructField("data_col_4", IntegerType(), True),
                    ]
                )
            ),
            False,
        ),
    ]
)

def validate_record(row):
    try:
        # More efficient way to check against schema
        row_dict = row.asDict()  # Convert Row to dictionary
        Row(**row_dict) #Create Row object from Dictionary. This will validate against schema.
        # Additional checks within tasks array (same as before)
        for task in row.tasks:
          if task.data_col_1 is None or not isinstance(task.data_col_1, int):
            return False, "data_col_1 is missing or invalid in a task"
          if task.data_col_2 is None or not isinstance(task.data_col_2, str):
            return False, "data_col_2 is missing or invalid in a task"
          if task.data_col_3 is not None and not isinstance(task.data_col_3, int):
            return False, "data_col_3 is invalid in a task"
          if task.data_col_4 is not None and not isinstance(task.data_col_4, int):
            return False, "data_col_4 is invalid in a task"
        return True, None  # All checks passed
    except Exception as e:
        return False, str(e)  # Mark as invalid with the exception message


# Read JSON files with PERMISSIVE mode to capture corrupt records
raw_df = (
    spark.read.schema(schema)
    .option("mode", "PERMISSIVE")
    .option("multiline", "true")
    .json("/tmp/json_data")
    .withColumn("src_filename", input_file_name())
)

# Apply validation using a UDF for better performance
validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType())]))

validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")

# Separate valid and invalid records (same as before)
valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message")
invalid_df = validated_df.filter("is_valid == false").drop("is_valid")

# Show the results
valid_df.show(truncate=False)
invalid_df.show(truncate=False)

 


Attaching sample cluster console output for reference. 

 

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Row
>>>
>>> # Define the schema
... schema = StructType(
...     [
...         StructField(
...             "meta",
...             StructType(
...                 [
...                     StructField("id", StringType(), False),
...                     StructField("timestamp", TimestampType(), False),
...                     StructField("version", IntegerType(), False),
...                 ]
...             ),
...             False,
...         ),
...         StructField(
...             "tasks",
...             ArrayType(
...                 StructType(
...                     [
...                         StructField("data_col_1", IntegerType(), False),
...                         StructField("data_col_2", StringType(), False),
...                         StructField("data_col_3", IntegerType(), True),
...                         StructField("data_col_4", IntegerType(), True),
...                     ]
...                 )
...             ),
...             False,
...         ),
...     ]
... )
>>>
>>>
>>> def validate_record(row):
...     try:
...         # Check meta (top level)
...         meta_dict = row.meta.asDict() if row.meta else {} # Handle potential null meta
...         Row(**meta_dict) #Validate meta against schema.
...         validated_tasks = []
...         for task in row.tasks:
...             try:
...                 task_dict = task.asDict()
...                 Row(**task_dict) #Validate each task against the schema.
...                 validated_tasks.append(task) #If valid add it to validated task list
...             except:
...                 validated_tasks.append(Row(data_col_1=None, data_col_2=None, data_col_3=None, data_col_4=None, _corrupt_record="Invalid Task")) # Append a "corrupted" task with nulls.
...         return True, None, validated_tasks # Return validated tasks
...     except Exception as e:
...         return False, str(e), []  # Mark as invalid with the exception message and empty tasks
...
>>>
>>>
>>> raw_df = (
...     spark.read.schema(schema)
...     .option("mode", "PERMISSIVE")
...     .option("multiline", "true")
...     .json("/tmp/json_data")
...     .withColumn("src_filename", input_file_name())
... )
>>> validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType()), StructField("validated_tasks", ArrayType(schema.fields[1].dataType.elementType))]))
>>>
>>> validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
>>> valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message").withColumn("tasks", col("validated_tasks")).drop("validated_tasks")
>>> invalid_df = validated_df.filter("is_valid == false").drop("is_valid", "validated_tasks")
>>> valid_df.show(truncate=False)
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|meta                                  |tasks                                        |src_filename                                                                    |
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|[efgh5678, 2025-02-07 07:59:12.123, 1]|[[0, Required, 9, 7], [22, Required, 10, 11]]|hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_2.json|
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+

>>> invalid_df.show(truncate=False)
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|meta|tasks|src_filename                                                                    |error_message                    |
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|null|null |hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_1.json|'NoneType' object is not iterable|
+----+-----+--------------------------------------------------------------------------------+---------------------------------+

>>>

 

Depending upon the use-case , feel free to edit the UDF. 

View solution in original post

4 REPLIES 4

avatar
Master Collaborator

we can use UDF to solve this problem. 

Sharing sample one for a example. 

 

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row

# ... (schema definition remains the same)
# Define the schema
schema = StructType(
    [
        StructField(
            "meta",
            StructType(
                [
                    StructField("id", StringType(), False),
                    StructField("timestamp", TimestampType(), False),
                    StructField("version", IntegerType(), False),
                ]
            ),
            False,
        ),
        StructField(
            "tasks",
            ArrayType(
                StructType(
                    [
                        StructField("data_col_1", IntegerType(), False),
                        StructField("data_col_2", StringType(), False),
                        StructField("data_col_3", IntegerType(), True),
                        StructField("data_col_4", IntegerType(), True),
                    ]
                )
            ),
            False,
        ),
    ]
)

def validate_record(row):
    try:
        # More efficient way to check against schema
        row_dict = row.asDict()  # Convert Row to dictionary
        Row(**row_dict) #Create Row object from Dictionary. This will validate against schema.
        # Additional checks within tasks array (same as before)
        for task in row.tasks:
          if task.data_col_1 is None or not isinstance(task.data_col_1, int):
            return False, "data_col_1 is missing or invalid in a task"
          if task.data_col_2 is None or not isinstance(task.data_col_2, str):
            return False, "data_col_2 is missing or invalid in a task"
          if task.data_col_3 is not None and not isinstance(task.data_col_3, int):
            return False, "data_col_3 is invalid in a task"
          if task.data_col_4 is not None and not isinstance(task.data_col_4, int):
            return False, "data_col_4 is invalid in a task"
        return True, None  # All checks passed
    except Exception as e:
        return False, str(e)  # Mark as invalid with the exception message


# Read JSON files with PERMISSIVE mode to capture corrupt records
raw_df = (
    spark.read.schema(schema)
    .option("mode", "PERMISSIVE")
    .option("multiline", "true")
    .json("/tmp/json_data")
    .withColumn("src_filename", input_file_name())
)

# Apply validation using a UDF for better performance
validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType())]))

validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")

# Separate valid and invalid records (same as before)
valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message")
invalid_df = validated_df.filter("is_valid == false").drop("is_valid")

# Show the results
valid_df.show(truncate=False)
invalid_df.show(truncate=False)

 


Attaching sample cluster console output for reference. 

 

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Row
>>>
>>> # Define the schema
... schema = StructType(
...     [
...         StructField(
...             "meta",
...             StructType(
...                 [
...                     StructField("id", StringType(), False),
...                     StructField("timestamp", TimestampType(), False),
...                     StructField("version", IntegerType(), False),
...                 ]
...             ),
...             False,
...         ),
...         StructField(
...             "tasks",
...             ArrayType(
...                 StructType(
...                     [
...                         StructField("data_col_1", IntegerType(), False),
...                         StructField("data_col_2", StringType(), False),
...                         StructField("data_col_3", IntegerType(), True),
...                         StructField("data_col_4", IntegerType(), True),
...                     ]
...                 )
...             ),
...             False,
...         ),
...     ]
... )
>>>
>>>
>>> def validate_record(row):
...     try:
...         # Check meta (top level)
...         meta_dict = row.meta.asDict() if row.meta else {} # Handle potential null meta
...         Row(**meta_dict) #Validate meta against schema.
...         validated_tasks = []
...         for task in row.tasks:
...             try:
...                 task_dict = task.asDict()
...                 Row(**task_dict) #Validate each task against the schema.
...                 validated_tasks.append(task) #If valid add it to validated task list
...             except:
...                 validated_tasks.append(Row(data_col_1=None, data_col_2=None, data_col_3=None, data_col_4=None, _corrupt_record="Invalid Task")) # Append a "corrupted" task with nulls.
...         return True, None, validated_tasks # Return validated tasks
...     except Exception as e:
...         return False, str(e), []  # Mark as invalid with the exception message and empty tasks
...
>>>
>>>
>>> raw_df = (
...     spark.read.schema(schema)
...     .option("mode", "PERMISSIVE")
...     .option("multiline", "true")
...     .json("/tmp/json_data")
...     .withColumn("src_filename", input_file_name())
... )
>>> validate_udf = udf(validate_record, StructType([StructField("is_valid", BooleanType()), StructField("error_message", StringType()), StructField("validated_tasks", ArrayType(schema.fields[1].dataType.elementType))]))
>>>
>>> validated_df = raw_df.withColumn("validation_results", validate_udf(struct(*raw_df.columns))).select("*", "validation_results.*").drop("validation_results")
>>> valid_df = validated_df.filter("is_valid == true").drop("is_valid", "error_message").withColumn("tasks", col("validated_tasks")).drop("validated_tasks")
>>> invalid_df = validated_df.filter("is_valid == false").drop("is_valid", "validated_tasks")
>>> valid_df.show(truncate=False)
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|meta                                  |tasks                                        |src_filename                                                                    |
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+
|[efgh5678, 2025-02-07 07:59:12.123, 1]|[[0, Required, 9, 7], [22, Required, 10, 11]]|hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_2.json|
+--------------------------------------+---------------------------------------------+--------------------------------------------------------------------------------+

>>> invalid_df.show(truncate=False)
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|meta|tasks|src_filename                                                                    |error_message                    |
+----+-----+--------------------------------------------------------------------------------+---------------------------------+
|null|null |hdfs://ccycloud-2.nightly-71x-ms.root.comops.site:8020/tmp/json_data/json_1.json|'NoneType' object is not iterable|
+----+-----+--------------------------------------------------------------------------------+---------------------------------+

>>>

 

Depending upon the use-case , feel free to edit the UDF. 

avatar
Explorer

@ggangadharan Thanks for your response and the suggestion.
I will try this out and confirm soon.

However, I expect a structural solution and should be handled during the spark.read.json with the schema.

avatar
Community Manager

@Sujit_Jain, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Explorer

@ggangadharan I have tried this and it is working. Thanks for the help.