Member since
01-06-2022
4
Posts
2
Kudos Received
0
Solutions
01-19-2022
07:51 AM
For others that stumble across this.. I ended up delving into scripting processors and implemented a script that does the batching: var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
// Grab 50 flow files from the input queue (or whatever's available)
flowFileList = session.get(50)
if(!flowFileList.isEmpty()) {
var ids = [];
for each (var flowFile in flowFileList) {
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile, new InputStreamCallback(function(inputStream) {
// Get the JSON out of the flowfile and conver to a JS object
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var obj = JSON.parse(text);
if (obj.hasOwnProperty('SourceDataElementValue')) {
ids.push(obj.SourceDataElementValue);
}
// Do something with text here
}));
// Eat the flowfile after the TML ID is extracted.
session.remove(flowFile);
}
if (ids.length > 0) {
attributeValue = ids.join();
outputFlowFile = session.create();
outputFlowFile = session.putAttribute(outputFlowFile, 'tml_list', attributeValue);
session.transfer(outputFlowFile, REL_SUCCESS)
}
}
... View more
01-12-2022
12:15 PM
1 Kudo
I was looking at the scripting capabilities... is something like the outline below possible (pseudocode, obviously) while (file = get new flow file from intput) {
data=...extract field ...
append data to string
if (extracted more than 50 things) {
emit new flowfile with string as attribute
}
} I've seen plenty of examples of what I'd consider a "transform single flowfile into something else" but not where a script can fetch another flowfile from the session with a "flowfile = session.get();" but it's unclear if you can do that multiple times in the same script.
... View more
01-11-2022
08:50 AM
1 Kudo
I have a flow that extracts an ID from a flow file, then does an ExectueSQL to fetch some additional data. Since this is one at a time, it's very inefficient (~300 records per minute - which doesn't sound bad until it's processing a million records). Is there a way to run flow files through a process and extract the required value in batches of 50 or 100 so that it returns a comma-separated value (like 123, 124, 125, 126, ... etc.) to pass to the ExecuteSQL so it's returning data in larger batches instead of one at a time? Currently, it's doing an EvaluateJsonPath, pulling that one value to a property so the ExecuteSQL processor can construct the SQL like "SELECT * FROM TABLE_NAME WHERE KEY = ${key}". I'd like to modify that so it's "...WHERE KEY IN (${keyList})" and keyList is the comma-separated list created above. Not sure how to approach this, however.
... View more
Labels:
- Labels:
-
Apache NiFi