Support Questions

Find answers, ask questions, and share your expertise

NiFi ExecuteScript - extracting and storing a value from a single line

avatar
New Contributor

Hi all,

 

I'm having problems getting the JavaScript I wrote for ExecuteScript to work properly.

 

What I'm trying to do:

My flowfiles contain rows of data from a SELECT statement. Each flowfile is searched line by line for non-ASCII characters, and all non-ASCII characters that are found are replaced with empty strings before the data is loaded to a target database. I'd like to grab the value for the table's unique identifier column and write it to a new table, say "nifi_replaced_data" so that I can easily look in the source database and see the row with the non-ASCII character that NiFi replaced in the target database.

 

For example, I might be processing a table with the columns:

column_1, column_2, unique_id, column_4

And the flowfile, which is tab delimited, might look like:

somedata    bad__character   AA11BB22   someotherdata
otherdata no_bad_character CC33DD44 someotherdata

 

The process:

  1. Each flowfile is checked in it's entirety for non-ASCII characters using regex. Only files that have one or more bad characters are routed to success / matched.
  2. An ExecuteScript processor is used to look at the columns attribute, and determine the index of the unique_id column. The unique_id column is guaranteed in all tables, but its index position will vary based on the table being processed. The index is stored in an attribute unique.id.index.
  3. The flowfile is split line by line using SplitText, and then RouteText is used to determine if the row / line contains a non-ASCII character. If found, the flowfile is routed to the 2nd ExecuteScript processor. 
  4. This ExecuteScript processor reads the flowfile via InputSteam, and splits it by the tab delimiter. It then uses the unique.id.index attribute to determine where the unique_id value is in the array, and writes the value to a new attribute unique.id.value.
  5. ExecuteSQL writes a row in the nifi_replaced_data table in the target database, using the unique_id value that was added as an attribute in step 4. It also writes the source table name, which is held in a separate attribute. 

You may be wondering where the bad character is replaced with an empty string. This happens at a later point in the flow and is not dependent on this processor group.

 

Issues:

I currently have two problems, one with the first step in the process and one with the fourth. 

 

Step #1 - When I run NiFi using a table that I know has one or more rows with a non-ASCII character, the ExtractText processor I'm using does not find any matches when looking at the entire flowfile - and thus nothing is routed on matched. I have the following configuration, but I'm wondering if I should be using a RouteText processor for this instead:

DANiFi_0-1669675267163.png

 

Step #4 - if I bypass the processor for step #1 and just split every flowfile by line without checking the entire thing first, I can find and replace non-ASCII characters, so I know for a fact that I have bad characters in the file. But, my ExecuteScript processor errors on every line when attempting to get the value of the unique_id. I'm not sure if the problem is with reading the flowfile via InputSteam, converting the line via IOUtils.ToString, or splitting the flowfile via tab delimiter. My hunch is that the IOUtils.ToString method is throwing an exception when it encounters a character that is not UTF-8, and so my delimiter split + index lookup does nothing:

var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();
if(flowFile != null) {

// Get the name of the table
var table = flowFile.getAttribute('table.name');

// Get the index of the unique_id via the attribute
var unique_id_index = flowFile.getAttribute('unique.id.index');

// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
try {
// Convert the single line of our flowfile to a UTF_8 encoded string
var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
catch(e) {
log.error('Error on toString', e)
}
// Split the delimited data into an array
var data = line.split('\t');
// Get the value of the unique_id, using the index
var unique_id = data[unique_id_index];

if (typeof unique_id === 'undefined') {
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(1);
objArray[0] = table;
objArray[1] = line;
log.error('Error: could not find unique_id value for table {} and line {}', objArray);
session.transfer(flowFile, REL_FAILURE)
}
else {
flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id)
session.transfer(flowFile, REL_SUCCESS)
}
inputStream.close()
}));
}

 

I'm fairly new to NiFi, and while I'm a developer - JavaScript is not my specialty. If someone could point me in the right direction, I'd really appreciate it! I've been struggling with this for far too long...

1 ACCEPTED SOLUTION

avatar
New Contributor

For anyone having a similar problem, here's how it was resolved.

 

The issue with #1 was fixed by switching from an ExtractText processor to a RouteOnContent processor. RouteOnContent is much more simplistic and easy to use - just create a property for routing the flowfile and add the regex.

 

#4 was fixed by:

  • Moving the unique_id_index variable outside of the callback so it could be used afterwards.
    • Same with the unique_id variable ^
  • Closing the InputStream before routing the flowfile.

Full (working) script is below:

var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();

// Get the name of the table
var table = flowFile.getAttribute('table.name');

// Get the index of the unique_id via the attribute
var unique_id_index = flowFile.getAttribute('unique.id.index');
var unique_id;

if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
try {
// Convert the single line of our flowfile to a UTF_8 encoded string
var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
catch(e) {
log.error('Error on toString', e)
}
// Split the delimited data into an array
var data = line.split('\t');
// Get the value of the unique_id, using the index
unique_id = data[unique_id_index];
}));

if (typeof unique_id === 'undefined') {
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(1);
objArray[0] = table;
objArray[1] = line;
log.error('Error: could not find unique_id value for table {} and line {}', objArray);
session.transfer(flowFile, REL_FAILURE)
}
else {
flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id)
session.transfer(flowFile, REL_SUCCESS)
}
}

 

View solution in original post

1 REPLY 1

avatar
New Contributor

For anyone having a similar problem, here's how it was resolved.

 

The issue with #1 was fixed by switching from an ExtractText processor to a RouteOnContent processor. RouteOnContent is much more simplistic and easy to use - just create a property for routing the flowfile and add the regex.

 

#4 was fixed by:

  • Moving the unique_id_index variable outside of the callback so it could be used afterwards.
    • Same with the unique_id variable ^
  • Closing the InputStream before routing the flowfile.

Full (working) script is below:

var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();

// Get the name of the table
var table = flowFile.getAttribute('table.name');

// Get the index of the unique_id via the attribute
var unique_id_index = flowFile.getAttribute('unique.id.index');
var unique_id;

if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
try {
// Convert the single line of our flowfile to a UTF_8 encoded string
var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
catch(e) {
log.error('Error on toString', e)
}
// Split the delimited data into an array
var data = line.split('\t');
// Get the value of the unique_id, using the index
unique_id = data[unique_id_index];
}));

if (typeof unique_id === 'undefined') {
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(1);
objArray[0] = table;
objArray[1] = line;
log.error('Error: could not find unique_id value for table {} and line {}', objArray);
session.transfer(flowFile, REL_FAILURE)
}
else {
flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id)
session.transfer(flowFile, REL_SUCCESS)
}
}