Member since
11-28-2022
2
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2541 | 11-29-2022 02:04 PM |
11-29-2022
02:04 PM
For anyone having a similar problem, here's how it was resolved. The issue with #1 was fixed by switching from an ExtractText processor to a RouteOnContent processor. RouteOnContent is much more simplistic and easy to use - just create a property for routing the flowfile and add the regex. #4 was fixed by: Moving the unique_id_index variable outside of the callback so it could be used afterwards. Same with the unique_id variable ^ Closing the InputStream before routing the flowfile. Full (working) script is below: var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); // Get the name of the table var table = flowFile.getAttribute('table.name'); // Get the index of the unique_id via the attribute var unique_id_index = flowFile.getAttribute('unique.id.index'); var unique_id; if(flowFile != null) { // Create a new InputStreamCallback, passing in a function to define the interface method session.read(flowFile, new InputStreamCallback(function(inputStream) { try { // Convert the single line of our flowfile to a UTF_8 encoded string var line = IOUtils.toString(inputStream, StandardCharsets.UTF_8); } catch(e) { log.error('Error on toString', e) } // Split the delimited data into an array var data = line.split('\t'); // Get the value of the unique_id, using the index unique_id = data[unique_id_index]; })); if (typeof unique_id === 'undefined') { var ObjectArrayType = Java.type("java.lang.Object[]"); var objArray = new ObjectArrayType(1); objArray[0] = table; objArray[1] = line; log.error('Error: could not find unique_id value for table {} and line {}', objArray); session.transfer(flowFile, REL_FAILURE) } else { flowFile = session.putAttribute(flowFile, 'unique.id.value', unique_id) session.transfer(flowFile, REL_SUCCESS) } }
... View more