Created 06-02-2023 11:57 AM
I have this data and I need to split VisitList's content into different flowfile one by one.
What should I write in JsonPath Expression so that data got split, I tried $.* but then it is considering this given data into one file only while I want to split on the basis of VisitList.
Data
[
{
"employer": "98765",
"loc_id": "312",
"topId": "Management",
"VisitList": [
{
"S1": "HR",
"S2": "Accountant"
},
{
"S1": "Manager",
"S2": "Sr. Manager"
}
]
}
]
I want to split Data into files like this 1st flowfile
[
{
"employer": "98765",
"loc_id": "312",
"topId": "Management",
"VisitList": [
{
"S1": "HR",
"S2": "Accountant"
}
]
}
]
2nd flowfile
[
{
"employer": "98765",
"loc_id": "312",
"topId": "Management",
"VisitList": [
{
"S1": "Manager",
"S2": "Sr. Manager"
}
]
}
]
I tried putting $.* in JsonPath Expression but it didnt work the way I want.
Created 06-02-2023 03:37 PM
@Dracile Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @steven-matison ans @cotopaul who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created 06-02-2023 03:45 PM
I do not think that SplitJson is the correct processor for you. What you are trying to achieve might be possible using some JOLT transformations. Unfortunately, I am not near a computer to test a correct transformation but I know that @SAMSAL has plenty of experience in using jolts and he might be able to further assist you.
Created 06-03-2023 09:50 AM
But I need to send file one by one and I have split the Content of VisitList only (only two rows S1 and S2), which has further splitted into 2 files using SplitJson but how can I process those two files one by one?
Created 06-05-2023 06:14 AM
@Dracile If you are looking to iterate through the results inside of your upstream json object, you need QueryRecord with a Json Reader and Writer. This allows you to provide the upstream schema (reader), downstream schema (writer) and a query against the flowfile. This will unfortunately lose the original object values
You can find an example here:
https://github.com/cldr-steven-matison/NiFi-Templates/blob/main/QueryRecord_Sample.json
You will need to modify the json object in GenerateFlowFile, then adjust the Reader/Writer, and the results query into $.VisitList[] array. Once you have this lil mini test working, take the logic to your final flow.
Created 06-08-2023 08:09 AM
@Dracile Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
Regards,
Diana Torres,Created 06-14-2023 06:57 AM
@Dracile - Instead I recommend using ForkRecord Processor.
Like @steven-matison mentioned, create a Record Reader and Record Writer and add another property with the record path /VisitList, Mode - Split, Include Parent Fields to True.
This will result in the next flowfile looking like this -
[ {
"employer" : "98765",
"loc_id" : "312",
"topId" : "Management",
"VisitList" : [ {
"S1" : "HR",
"S2" : "Accountant"
} ]
}, {
"employer" : "98765",
"loc_id" : "312",
"topId" : "Management",
"VisitList" : [ {
"S1" : "Manager",
"S2" : "Sr. Manager"
} ]
} ]
Then you could split on $ using a SplitJson processor or even better continue using more record-oriented processors for better performance 🙂
Created 06-14-2023 12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()