Created on 05-13-2023 01:10 AM - edited 05-13-2023 03:36 AM
Dear Experts,
We want to learn how to perform batch processing on semi-structured JSON flow files in NiFi. We know that we can use a Jolt Transform Record, an Update Record processor, and a Query Record processor, for which we need to set reader and writer controller services. However, we are unsure how to set up reader and writer controllers for semi-structured data schemas.
In the text below, we have a multi-line JSON flow file in which the "code" field is static and the "other" field is an unstructured nested JSON. We want to query the "code" field in batch mode and include the "other" field information in the resulting flow files for processing in subsequent processors. How should we set the reader and writer controllers to achieve this?
Regards,
{"code": "6", "other": {"feild1": "data1"}}
{"code": "7", "other": {"feild2": "data2"}}
{"code": "8", "other": {"feild3": "data3"}}
Created 05-13-2023 06:52 AM
Hi @Arash ,
Not sure if there is a reader\writer that can work semi-structured data. You can develop your custom reader\writer but that will be an effort. Since you are getting your input as multiple json records lines you can either use SplitText processor to split each json record into its own flowfile and then process each record independently, or convert the input into Json array using two ReplaceText processors ( see screenshot below), then use QueryRecord & UpdateRecord with JsonTreeReader\Writer.
First ReplaceText: replace line break with comma
2ed ReplaceText: Surround the entire text with []
Hope that helps.
Thanks
Created 05-13-2023 07:35 AM
Hi @SAMSAL ,
We have high TPS so our constraint is using batch processing. Therefore, we do not want to split records line-by-line and we have already split flow files to 1000 records. Therefore, we need a solution for batch processing in NIFI.
Created 05-14-2023 02:29 AM
Dear @SAMSAL ,
Because we have semi-structured JSON data in the "other" field, we do not have a fixed schema to set in the JsonTreeReader. We want a solution for batch processing of the flowfiles with that assumption.
Created 06-14-2023 02:26 PM
This could possibly be achieved via a InvokeScriptedProcessor but I would need to know the source and what the expected output would be. For example, taking what you posted and you want to filter on code=6 and code=8 and only have the values of "other" as individual FlowFiles, then something like this Groovy based code could achive that
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
PropertyDescriptor FILTER_CODES = new PropertyDescriptor.Builder()
.name("FILTER_CODES")
.displayName("Filter Codes")
.description("Codes to Filter On")
.required(true)
.defaultValue("6,8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE, FILTER_CODES]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
String filterCodesString = context.getProperty(FILTER_CODES).getValue()
List<Integer> filterCodes = filterCodesString.split(",").findAll { it.trim().matches("\\d+") }.collect { it as Integer }
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
session.read(flowFile, { inputStream ->
inputStream.eachLine { line ->
if (line?.trim()) {
Map dataMap = jsonSlurper.parseText(line)
if (filterCodes.contains(dataMap.code.toInteger())) {
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile,
{
outputStream -> outputStream.write(jsonOutput.toJson(dataMap.other).getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
} as InputStreamCallback)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()