Support Questions
Find answers, ask questions, and share your expertise

Vanishing out flow file from executescript running ecmascript.

Explorer

I need help with a NiFi (1.4.0) mystery. I have a flowfile(config json, below) consumed by the ExecuteScript processor running listed ECMASCRIPT below.

In gist, idea is to emit the atom feed URLs on set frequency, for the next InvokeHttp to connect and pull data for processing.

Script works fine on setting the schedule and triggering, however the issue is that the flowfile (containing the URL) generated by the script vanishes. There is no error or any trace of the file created every time the function is triggered.

Any help in getting the out flow files to the next processor in flow (REL_SUCCESS) is much appreciated. Thank you.

Input flowfile:

{
    "dataSource":[{
        "name": "My Atom Feed - #1",
        "sourceURL": "http://xyz.com/1",
        "protocol": "http",
        "format": "xml",
        "feed": "atom",
        "recordType": "workrecord",
        "frequencyInSeconds": "2",
        "batchSize": "100",
        "maxRetries": "0"
    },
    {
        "name": "My Atom Feed - #2",
        "sourceURL": "http://xyz.com/2",
        "protocol": "http",
        "format": "xml",
        "feed": "atom",
        "recordType": "client",
        "frequencyInSeconds": "10",
        "batchSize": "100",
        "maxRetries": "1"
    },
    {
        "name": "My Atom Feed - #3",
        "sourceURL": "http://xyz.com/3",
        "protocol": "http",
        "format": "xml",
        "feed": "atom",
        "recordType": "service",
        "frequencyInSeconds": "20",
        "batchSize": "100",
        "maxRetries": "2"
    }]
}

ECMAScript:

(function(context) {
    'use strict';
    log.info("Starting....")
    var Timer = Java.type('java.util.Timer');
    var Phaser = Java.type('java.util.concurrent.Phaser');
  
    var timer = new Timer('amberIngestScheduler', false);
    var phaser = new Phaser();
  
    var timeoutStack = 0;
    function pushTimeout() {
      timeoutStack++;
    }
    function popTimeout() {
      timeoutStack--;
      if (timeoutStack > 0) {
        return;
      }
      timer.cancel();
      phaser.forceTermination();
    }
  
    var onTaskFinished = function() {
      phaser.arriveAndDeregister();
    };
  
    context.setTimeout = function(fn, millis /* [, args...] */) {
      var args = [].slice.call(arguments, 2, arguments.length);
  
      var phase = phaser.register();
      var canceled = false;
      timer.schedule(function() {
        if (canceled) {
          return;
        }
  
        try {
          fn.apply(context, args);
        } catch (e) {
          print(e);
        } finally {
          onTaskFinished();
          popTimeout();
        }
      }, millis);
  
      pushTimeout();
  
      return function() {
        onTaskFinished();
        canceled = true;
        popTimeout();
      };
    };
  
    context.clearTimeout = function(cancel) {
      cancel();
    };
  
    context.setInterval = function(fn, delay /* [, args...] */) {
      var args = [].slice.call(arguments, 2, arguments.length);
  
      var cancel = null;
  
      var loop = function() {
        cancel = context.setTimeout(loop, delay);
        fn.apply(context, args);
      };
  
      cancel = context.setTimeout(loop, delay);
      return function() {
        cancel();
      };
    };
  
    context.clearInterval = function(cancel) {
      cancel();
    };
  
  })(this);


var parentFlowFile = session.get();


if (parentFlowFile !== null) {

    var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
    var IOUtils = Java.type("org.apache.commons.io.IOUtils");
    var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");    

    var isError = false
    var inFlowFileJSON = null
    session.read(parentFlowFile, new InputStreamCallback(function (inputStream) {
        inFlowFileJSON = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    }))
    
    var timer = null
    var timerHandles = []

    try {
        var conf = JSON.parse(inFlowFileJSON)

        for (i = 0; i < conf.dataSource.length; i++) {
            timer = setInterval(publishFlowFile, parseInt(conf.dataSource[i].frequencyInSeconds) * 1000, conf, i, session)
            timerHandles.concat(timer)
        }
 
    } catch (error) {
        log.error('Issue loading the config for datapipe, ' + error)
        isError = true
    }

    if (isError) {
        log.info("Moving the parent flowfile to the Failure path....")
        session.transfer(parentFlowFile, REL_FAILURE)
    } else {
        log.info("Removing the parent flowfile....")
        session.remove(parentFlowFile)
    }

}


function publishFlowFile(conf, i, session) {
    log.info(session)
    try{
        log.info("Sending file...."+i)
        var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback")

        var outFlowFile = session.create()
        outFlowFile = session.write(outFlowFile, new OutputStreamCallback(function (outputStream) {
            outputStream.write(JSON.stringify(conf.dataSource[i].sourceURL).getBytes(StandardCharsets.UTF_8))
        }))
        log.info(outFlowFile)
        session.transfer(outFlowFile, REL_SUCCESS)
    }
    catch(err){
        log.info(err)
    }
}



2 REPLIES 2

Super Guru

I can't tell from your code where the main script waits for the timers to be called. If the main script returns, then the timer callbacks are likely not getting invoked, and your flow files will not be processed. I think you'd have to have the main script wait for the number of callbacks you want invoked, but in general for this kind of processing I think you're going to want an implementation in InvokeScriptedProcessor rather than ExecuteScript. The former gives you more control over the lifecycle of the processor, such as allowing for more complicated structures/flow such as timers, callbacks, and other async methods. To help port your code from ExecuteScript to InvokeScriptedProcessor (ISP), you can check my blog post for a ECMAScript skeleton in ISP.

Explorer

Thanks @matt burgess for getting back on this. Would I be able to use nifi-script-tester for testing the InvokeScriptedProcessor (running ECMAScript)?

; ;