Member since
01-12-2024
3
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
462 | 01-16-2024 11:20 PM |
02-06-2025
11:13 PM
Hi, I have a use case where I have to read the JSON files from "/data/json_files/" location with schema enforced. For the completeness we want to mark the invalid records. The invalid records may be the ones where the mandatory field/s are null, data type mismatch or invalid json itself. I have tried below but nothing worked as of now. It would be nice if someone has already this use case and a solution for it or may be knowledgeable in this area. Example Schema: schema = StructType( [ StructField( "meta", StructType( [ StructField("id", StringType(), False), StructField("timestamp", TimestampType(), False), StructField("version", IntegerType(), False), ] ), False, ), StructField( "data", ArrayType( StructType( [ StructField("data_col_1", IntegerType(), False), StructField("data_col_2", StringType(), False), StructField("data_col_3", IntegerType(), True), StructField("data_col_4", IntegerType(), True) ] ) ), False ) ] ) JSON file: json_1.json "data_col_4" is having wrong data type. "data_col_2" is mandatory as per schema but got null. { "meta": { "id": "abcd1234", "timestamp": "2025-02-07T07:59:12.123Z", "version": 1, }, "tasks": [ { "data_col_1": 12, "data_col_2": "Required", "data_col_3": 9, "data_col_4": 7 }, { "data_col_1": 13, "data_col_2": "Required", "data_col_3": 10, "data_col_4": "Wrong data type" }, { "data_col_1": 14, "data_col_2": null, "data_col_3": 11, "data_col_4": 8 } ] } json_2.json the "data_col_1" is missing in the tasks. { "meta": { "id": "efgh5678", "timestamp": "2025-02-07T07:59:12.123Z", "version": 1, }, "tasks": [ { "data_col_2": "Required", "data_col_3": 9, "data_col_4": 7, }, { "data_col_1": 22, "data_col_2": "Required", "data_col_3": 10, "data_col_4": 11 } ] } PySpark Code: raw_df = ( spark.read.schema(estate_schema) .option("mode", "PERMISSIVE") .option("multiline", "true") .json("/data/json_files/") .withColumn("src_filename", input_file_name()) ) OR invalid_df = ( spark.read.schema(estate_schema) .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", "_corrupt_record") .option("multiline", "true") .json("/data/json_files/") .withColumn("src_filename", input_file_name()) ) Expected Outcome: All the valid records of meta and within the tasks array should be processed and invalid (missing mandatory field or incorrect data type or invalid json) should be marked as invalid for that particular records.
... View more
Labels:
01-16-2024
11:20 PM
Hello Community, I have found a solution for this issue. The Authentication Expiration value was less than the gap between last run (today) and next first trigger (tomorrow), and that kerberos certificate was already expired and hence it was unable to run. We have increased the Authentication Expiration period and the issue is resolved now. Thanks, Sujit
... View more
01-12-2024
12:54 AM
Hello Everyone, We are facing below issue in our CFM. Background: We have NiFi Flow which uses GetHDFS processor to fetch all the HDFS files and performs few transformation and put the data back to HDFS using putHDFS processor. The CRON used for this was: 0 0/45 06-19,21-05 1/1 * ? * Now: Now based on the requirement we have changed the CRON schedule to "0 30 10-19 1/1 * ? *" (0 30 10-19 * * ? *) , the getHDFS processor runs at CRON schedule and also process the data on the day of change. But from next day/tomorrow onwards the processor is running/threads are intiated on all nodes based on CRON schedule however it does not process the data and gives a below warning. Warning: Error while retrieving list of files due to DestHost:desport <master_hostname>:<port> LocalHost:localport <worker_node>:<port> Failed on Local Exception: java.io.IOException: Couldn't setup connection for <nifi_user>@principal to <master_hostname> Does anyone recognise this issue, request to provide an inputs on this. Thanks, Sujit
... View more
Labels:
- Labels:
-
Apache NiFi