Member since
01-06-2022
4
Posts
2
Kudos Received
0
Solutions
01-19-2022
07:51 AM
For others that stumble across this.. I ended up delving into scripting processors and implemented a script that does the batching: var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
// Grab 50 flow files from the input queue (or whatever's available)
flowFileList = session.get(50)
if(!flowFileList.isEmpty()) {
var ids = [];
for each (var flowFile in flowFileList) {
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile, new InputStreamCallback(function(inputStream) {
// Get the JSON out of the flowfile and conver to a JS object
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var obj = JSON.parse(text);
if (obj.hasOwnProperty('SourceDataElementValue')) {
ids.push(obj.SourceDataElementValue);
}
// Do something with text here
}));
// Eat the flowfile after the TML ID is extracted.
session.remove(flowFile);
}
if (ids.length > 0) {
attributeValue = ids.join();
outputFlowFile = session.create();
outputFlowFile = session.putAttribute(outputFlowFile, 'tml_list', attributeValue);
session.transfer(outputFlowFile, REL_SUCCESS)
}
}
... View more