I am trying to run an ExecuteScript that should be taking in a json message and parsing the fields for further processing but I cannot get this to work. I ran a tester for ExecuteScript created by this author:
https://funnifi.blogspot.com/2016/06/testing-executescript-processor-scripts.html
And it returned the following error:
cannot create an instance from the abstract interface org.apache.nifi.processor.io.StreamCallback
I do not see the error in the script so could someone please assist with this:
flowFile = session.get();
if (flowFile != null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var transformed_message = {};
var error = false;
var line = "ops_track";
// Get attributes
flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {
// Read input FlowFile content
var content = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // message or content
var message_content = {};
try {
message_content = JSON.parse(content);
transformed_message.postID = (((message_content || {}).postID || "null"));
transformed_message.contentType = (((message_content || {}).contentType || "null"));
transformed_message.published = (((message_content || {}).published || "null"));
transformed_message.crawled = (((message_content || {}).crawled || "null"));
transformed_message.providerID = (((message_content || {}).providerID || "null"));
line = line + " " + "postID=" + transformed_message.postID + ","
+ "contentType=" + transformed_message.contentType + ","
+ "published=" + transformed_message.published + ","
+ "crawled=" + transformed_message.crawled + ","
+ "providerID=" + transformed_message.providerID + ","
+ " value=" + "1" + " "
+ time * 1000000 + "\n";
// Write output content
if (transformed_message) {
outputStream.write(line.getBytes(StandardCharsets.UTF_8));
}
} catch (e) {
error = true;
outputStream.write(content.getBytes(StandardCharsets.UTF_8));
}
}));
if (error) {
session.transfer(flowFile, REL_FAILURE)
} else {
session.transfer(flowFile, REL_SUCCESS)
}
}