Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Best approach to re merging flowfiles in NiFi

Highlighted

Best approach to re merging flowfiles in NiFi

Contributor

I'm utilizing "SplitAvro" in prior to "ConvertAvroToJSON" to allow blob objects too large to convert via varchar in an SQL stream to be able to be ingested into a flowfile. I use several "SplitContent" to isolate the object needed to be handled specially via a custom "ExecuteScript" which, in turn, utilizes a "JSON.stringify" to get a string which can be handled by any subsequent JSON related processors. I'm wondering if I can utilize a MergeContent or something similar to re-merge the customized string back to the sibling flowfile which contained the other JSON structure? Is the processor dynamic enough to pair back to a separate flowfile using uuid or something similar and the 'Defragment' Merge Strategy. Or could I create a dynamic variable in the JSON structure flowfile which could map back to the corresponding customized string via some type of iterative process? Thank you for any hints or suggestions you can provide. ~Sean

3 REPLIES 3
Highlighted

Re: Best approach to re merging flowfiles in NiFi

MergeContent in Defragment mode would work if you were only merging back one split, so if you have SplitAvro -> ConvertAvroToJson -> ExecuteScript -> MergeContent I think it would work. Once you have multiple SplitAvro processors then each one is writing over the fragment attributes with the most recent split information so MergeContent can only merge back together the most recent Splits.

What are you doing in your ExecuteScript processor to find the problematic record? Just curious to understand that part to see if there is a different way to do the same thing.

Highlighted

Re: Best approach to re merging flowfiles in NiFi

Contributor

Thank you for the reply Bryan. I'm stripping the data not needing to be converted prior to the ExecuteScript via two SplitContext processors that utilize hardcoded hexadecimal strings to isolate the string (called attributed 'default_payload') needing to be converted via the ExecuteStript:

var flowFile = session.get();
  if (flowFile !== null) {
    var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
    var IOUtils = Java.type("org.apache.commons.io.IOUtils");
    var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
    flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) {
       var contentObj = flowFile.getAttribute("default_payload");
        outputStream.write(JSON.stringify(contentObj ).getBytes(StandardCharsets.UTF_8));
    }));
    session.transfer(flowFile, REL_SUCCESS);
}

If I understand what your saying then the ExecuteScript would need to handled both the splitting and conversion at the same processor as to not write over attributes which would keep it synchronized in the flow? I had tried to use a parent UpdateAttribute to introduce a unique value to the flowfile(s) prior to the split/convertion and other JSON processing, but I am thinking this is to loose of a linkage for this type of processing.

Highlighted

Re: Best approach to re merging flowfiles in NiFi

Contributor

It occurred to me that it might be better to just convert the entire flowfile contents to the stringify version rather than isolating the problematic blob first. I attempted this by using the following in the ExecuteScript:

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new StreamCallback, passing in a function to define the interface method
  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(JSON.stringify(text).getBytes(StandardCharsets.UTF_8))
    }));
     session.transfer(flowFile, REL_SUCCESS);
}


However, the resulting content has been modified where the existing json structure is now masked by the use of escape characters before double quotes for instance. Is there an easy way to convert this object back into a json array? Thanks.

Don't have an account?
Coming from Hortonworks? Activate your account here