- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
How to split particular data within json using splitjson processor in Apache Nifi?
- Labels:
-
Apache NiFi
Created 06-02-2023 11:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have this data and I need to split VisitList's content into different flowfile one by one.
What should I write in JsonPath Expression so that data got split, I tried $.* but then it is considering this given data into one file only while I want to split on the basis of VisitList.
Data
[
{
"employer": "98765",
"loc_id": "312",
"topId": "Management",
"VisitList": [
{
"S1": "HR",
"S2": "Accountant"
},
{
"S1": "Manager",
"S2": "Sr. Manager"
}
]
}
]
I want to split Data into files like this 1st flowfile
[
{
"employer": "98765",
"loc_id": "312",
"topId": "Management",
"VisitList": [
{
"S1": "HR",
"S2": "Accountant"
}
]
}
]
2nd flowfile
[
{
"employer": "98765",
"loc_id": "312",
"topId": "Management",
"VisitList": [
{
"S1": "Manager",
"S2": "Sr. Manager"
}
]
}
]
I tried putting $.* in JsonPath Expression but it didnt work the way I want.
Created 06-02-2023 03:37 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Dracile Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @steven-matison ans @cotopaul who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Community Moderator
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
Created 06-02-2023 03:45 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I do not think that SplitJson is the correct processor for you. What you are trying to achieve might be possible using some JOLT transformations. Unfortunately, I am not near a computer to test a correct transformation but I know that @SAMSAL has plenty of experience in using jolts and he might be able to further assist you.
Created 06-03-2023 09:50 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
But I need to send file one by one and I have split the Content of VisitList only (only two rows S1 and S2), which has further splitted into 2 files using SplitJson but how can I process those two files one by one?
Created 06-05-2023 06:14 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Dracile If you are looking to iterate through the results inside of your upstream json object, you need QueryRecord with a Json Reader and Writer. This allows you to provide the upstream schema (reader), downstream schema (writer) and a query against the flowfile. This will unfortunately lose the original object values
You can find an example here:
https://github.com/cldr-steven-matison/NiFi-Templates/blob/main/QueryRecord_Sample.json
You will need to modify the json object in GenerateFlowFile, then adjust the Reader/Writer, and the results query into $.VisitList[] array. Once you have this lil mini test working, take the logic to your final flow.
Created 06-08-2023 08:09 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Dracile Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
Regards,
Diana Torres,Community Moderator
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
Created 06-14-2023 06:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Dracile - Instead I recommend using ForkRecord Processor.
Like @steven-matison mentioned, create a Record Reader and Record Writer and add another property with the record path /VisitList, Mode - Split, Include Parent Fields to True.
This will result in the next flowfile looking like this -
[ {
"employer" : "98765",
"loc_id" : "312",
"topId" : "Management",
"VisitList" : [ {
"S1" : "HR",
"S2" : "Accountant"
} ]
}, {
"employer" : "98765",
"loc_id" : "312",
"topId" : "Management",
"VisitList" : [ {
"S1" : "Manager",
"S2" : "Sr. Manager"
} ]
} ]
Then you could split on $ using a SplitJson processor or even better continue using more record-oriented processors for better performance 🙂
Created 06-14-2023 12:33 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()