Created 11-28-2022 03:00 PM
Hi all,
I'm having problems getting the JavaScript I wrote for ExecuteScript to work properly.
What I'm trying to do:
My flowfiles contain rows of data from a SELECT statement. Each flowfile is searched line by line for non-ASCII characters, and all non-ASCII characters that are found are replaced with empty strings before the data is loaded to a target database. I'd like to grab the value for the table's unique identifier column and write it to a new table, say "nifi_replaced_data" so that I can easily look in the source database and see the row with the non-ASCII character that NiFi replaced in the target database.
For example, I might be processing a table with the columns:
column_1, column_2, unique_id, column_4
And the flowfile, which is tab delimited, might look like:
somedata bad__character AA11BB22 someotherdata
otherdata no_bad_character CC33DD44 someotherdata
The process:
You may be wondering where the bad character is replaced with an empty string. This happens at a later point in the flow and is not dependent on this processor group.
Issues:
I currently have two problems, one with the first step in the process and one with the fourth.
Step #1 - When I run NiFi using a table that I know has one or more rows with a non-ASCII character, the ExtractText processor I'm using does not find any matches when looking at the entire flowfile - and thus nothing is routed on matched. I have the following configuration, but I'm wondering if I should be using a RouteText processor for this instead:
Step #4 - if I bypass the processor for step #1 and just split every flowfile by line without checking the entire thing first, I can find and replace non-ASCII characters, so I know for a fact that I have bad characters in the file. But, my ExecuteScript processor errors on every line when attempting to get the value of the unique_id. I'm not sure if the problem is with reading the flowfile via InputSteam, converting the line via IOUtils.ToString, or splitting the flowfile via tab delimiter. My hunch is that the IOUtils.ToString method is throwing an exception when it encounters a character that is not UTF-8, and so my delimiter split + index lookup does nothing:
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Get the name of the table
var table = flowFile.getAttribute('table.name');
// Get the index of the unique_id via the attribute
var unique_id_index = flowFile.getAttribute('unique.id.index');
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
try {
// Convert the single line of our flowfile to a UTF_8 encoded string
var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
catch(e) {
log.error('Error on toString', e)
}
// Split the delimited data into an array
var data = line.split('\t');
// Get the value of the unique_id, using the index
var unique_id = data[unique_id_index];
if (typeof unique_id === 'undefined') {
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(1);
objArray[0] = table;
objArray[1] = line;
log.error('Error: could not find unique_id value for table {} and line {}', objArray);
session.transfer(flowFile, REL_FAILURE)
}
else {
flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id)
session.transfer(flowFile, REL_SUCCESS)
}
inputStream.close()
}));
}
I'm fairly new to NiFi, and while I'm a developer - JavaScript is not my specialty. If someone could point me in the right direction, I'd really appreciate it! I've been struggling with this for far too long...
Created 11-29-2022 02:04 PM
For anyone having a similar problem, here's how it was resolved.
The issue with #1 was fixed by switching from an ExtractText processor to a RouteOnContent processor. RouteOnContent is much more simplistic and easy to use - just create a property for routing the flowfile and add the regex.
#4 was fixed by:
Full (working) script is below:
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
// Get the name of the table
var table = flowFile.getAttribute('table.name');
// Get the index of the unique_id via the attribute
var unique_id_index = flowFile.getAttribute('unique.id.index');
var unique_id;
if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
try {
// Convert the single line of our flowfile to a UTF_8 encoded string
var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
catch(e) {
log.error('Error on toString', e)
}
// Split the delimited data into an array
var data = line.split('\t');
// Get the value of the unique_id, using the index
unique_id = data[unique_id_index];
}));
if (typeof unique_id === 'undefined') {
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(1);
objArray[0] = table;
objArray[1] = line;
log.error('Error: could not find unique_id value for table {} and line {}', objArray);
session.transfer(flowFile, REL_FAILURE)
}
else {
flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id)
session.transfer(flowFile, REL_SUCCESS)
}
}
Created 11-29-2022 02:04 PM
For anyone having a similar problem, here's how it was resolved.
The issue with #1 was fixed by switching from an ExtractText processor to a RouteOnContent processor. RouteOnContent is much more simplistic and easy to use - just create a property for routing the flowfile and add the regex.
#4 was fixed by:
Full (working) script is below:
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
// Get the name of the table
var table = flowFile.getAttribute('table.name');
// Get the index of the unique_id via the attribute
var unique_id_index = flowFile.getAttribute('unique.id.index');
var unique_id;
if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
try {
// Convert the single line of our flowfile to a UTF_8 encoded string
var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
catch(e) {
log.error('Error on toString', e)
}
// Split the delimited data into an array
var data = line.split('\t');
// Get the value of the unique_id, using the index
unique_id = data[unique_id_index];
}));
if (typeof unique_id === 'undefined') {
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(1);
objArray[0] = table;
objArray[1] = line;
log.error('Error: could not find unique_id value for table {} and line {}', objArray);
session.transfer(flowFile, REL_FAILURE)
}
else {
flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id)
session.transfer(flowFile, REL_SUCCESS)
}
}