Created on 05-13-2023 01:10 AM - edited 05-13-2023 03:36 AM
Dear Experts,
We want to learn how to perform batch processing on semi-structured JSON flow files in NiFi. We know that we can use a Jolt Transform Record, an Update Record processor, and a Query Record processor, for which we need to set reader and writer controller services. However, we are unsure how to set up reader and writer controllers for semi-structured data schemas.
In the text below, we have a multi-line JSON flow file in which the "code" field is static and the "other" field is an unstructured nested JSON. We want to query the "code" field in batch mode and include the "other" field information in the resulting flow files for processing in subsequent processors. How should we set the reader and writer controllers to achieve this?
Regards,
{"code": "6", "other": {"feild1": "data1"}}
{"code": "7", "other": {"feild2": "data2"}}
{"code": "8", "other": {"feild3": "data3"}}
Created 05-13-2023 06:52 AM
Hi @Arash ,
Not sure if there is a reader\writer that can work semi-structured data. You can develop your custom reader\writer but that will be an effort. Since you are getting your input as multiple json records lines you can either use SplitText processor to split each json record into its own flowfile and then process each record independently, or convert the input into Json array using two ReplaceText processors ( see screenshot below), then use QueryRecord & UpdateRecord with JsonTreeReader\Writer.
First ReplaceText: replace line break with comma
2ed ReplaceText: Surround the entire text with []
Hope that helps.
Thanks
Created 05-13-2023 07:35 AM
Hi @SAMSAL ,
We have high TPS so our constraint is using batch processing. Therefore, we do not want to split records line-by-line and we have already split flow files to 1000 records. Therefore, we need a solution for batch processing in NIFI.
Created 05-14-2023 02:29 AM
Dear @SAMSAL ,
Because we have semi-structured JSON data in the "other" field, we do not have a fixed schema to set in the JsonTreeReader. We want a solution for batch processing of the flowfiles with that assumption.