Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Which Nifi processor can send a flowfile to a running Java program?

avatar

I realize the program could be a web, FTP, etc. server, but I'm talking about directly & efficiently. Having to start a program over & over again seems a huge waste when throughput goes up.

Imagine calling a running Java program that is in a while() loop, looping through on each line from STDIN, using a newline as the delimiter.

ExecuteStreamCommand seems to just call a program/script, optionally passing the flowfile on STDIN, but again, it is asking to load the program from disk & GC for every single flowfile...

Can we send data to that program lightning fast without having to setup web, SFTP, SSH servers?

---

Again, the use case is a processor being hammered with text as a flowfile, and wanting to get it into a java program as fast as possible. Loading, running, and stopping the code for each and every flowfile is just too computationally expensive. Preferred is a direct connection to the application, as when the application is running continuously, blocking when no data.

Your insights appreciated!

---

We're not talking about calling a static method, but this at first seemed interesting:

from here: "Answer by mburgess Apr 27 at 03:34 AM

To @Artem Ervits comment, if the Java class is available in the class loader (either by being part of the NiFi Core or being specified as a JAR in the ExecuteScript's Module Directory property), then you can call it directly, you don't need reflection.

If I'm not answering your question, can you provide more details? I'm happy to help :)"

1 ACCEPTED SOLUTION

avatar

I think there are 4 options you can consider:

  1. Use ExecuteScript with the current code on NiFi's classpath or in ExecuteScript's module directory. It doesn't matter whether it's a static method, but if it's not, the relevant class will get instantiated on each call to onTrigger, so if it's heavy, then, yeah, it'll be inefficient.
  2. Use InvokeScriptedProcessor. This is like ExecuteScript but basically lets you implement the full set of processor lifecycle methods so you'll be able to reuse resources on successive calls to onTrigger.
  3. Create a custom processor (and/or controller service) which depends on your code. Again, you'll gain control over the entire processor lifecycle, be able to reuse local resources and if you need to implement things like connection/resource pools you can create them with controller services. The NiFi Developer Guide [1] has lots of information about this. Also, there's a Maven archetype [2] available.
  4. If you really don't want to embed your code in NiFi, you can have NiFi communicate with your process via REST, TCP or its own site-to-site protocol. In fact, NiFi communicates with streaming frameworks such as Storm, Spark and Flink using it's site-to-site protocol client [3] so this pattern is pretty well-established. You can check out an example of this with the Spark receiver [4].

References:

[1] https://nifi.apache.org/developer-guide.html

[2] https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions

[3] https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/a...

[4] https://github.com/apache/nifi/blob/master/nifi-external/nifi-spark-receiver/src/main/java/org/apach...

View solution in original post

6 REPLIES 6

avatar

I think there are 4 options you can consider:

  1. Use ExecuteScript with the current code on NiFi's classpath or in ExecuteScript's module directory. It doesn't matter whether it's a static method, but if it's not, the relevant class will get instantiated on each call to onTrigger, so if it's heavy, then, yeah, it'll be inefficient.
  2. Use InvokeScriptedProcessor. This is like ExecuteScript but basically lets you implement the full set of processor lifecycle methods so you'll be able to reuse resources on successive calls to onTrigger.
  3. Create a custom processor (and/or controller service) which depends on your code. Again, you'll gain control over the entire processor lifecycle, be able to reuse local resources and if you need to implement things like connection/resource pools you can create them with controller services. The NiFi Developer Guide [1] has lots of information about this. Also, there's a Maven archetype [2] available.
  4. If you really don't want to embed your code in NiFi, you can have NiFi communicate with your process via REST, TCP or its own site-to-site protocol. In fact, NiFi communicates with streaming frameworks such as Storm, Spark and Flink using it's site-to-site protocol client [3] so this pattern is pretty well-established. You can check out an example of this with the Spark receiver [4].

References:

[1] https://nifi.apache.org/developer-guide.html

[2] https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions

[3] https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/a...

[4] https://github.com/apache/nifi/blob/master/nifi-external/nifi-spark-receiver/src/main/java/org/apach...

avatar
@jfrazee

I am very impressed and grateful for your response. To date I was thinking I'd have to use Kafka (as in Java Kafka Consumer), as Nifi is good with Kafka out of the box... I can envision 100s of events per second quite easily...

Kafka's consumer groups might help with longer running parsers, effectively adding a pool of parsers who can then feed their results back into Nifi via Kafka again. Looking at the nar requirements for a nifi service and accompanying processor looks like a steep learning curve for me at this time... I owe it to you to look into each of these pointers carefully though, and over this coming week I will!

Thanks again!

avatar
Contributor

I'd recommend that you take a serious look at creating a custom NiFi processor. I found it to be a fairly straightforward exercise and wrote a recent post about it here: https://hortonworks.com/blog/apache-nifi-not-scratch/

avatar

@Paul Boal,

I found it as easy as you say in maven on the command line. And actually, I'm at the point whee the code is ready to be moved from testing into a custom processor-but... I cannot get PutKafka to work. Without running any of my code, just a:

4972-screenshot-from-2016-06-11-11-34-25.png

With a known good Kafka running (other /producers using the same Topic)...

...yields:

Auto refresh started
11:17:19 EDT WARNING 2b76634c-6354-4c3c-b977-86f73d64efbc
PutKafka[id=2b76634c-6354-4c3c-b977-86f73d64efbc] Processor Administratively Yielded for 1 sec due to processing failure
11:18:00 EDT ERROR 2b76634c-6354-4c3c-b977-86f73d64efbc
PutKafka[id=2b76634c-6354-4c3c-b977-86f73d64efbc] Failed while waiting for acks from Kafka
11:18:00 EDT ERROR 2b76634c-6354-4c3c-b977-86f73d64efbc
PutKafka[id=2b76634c-6354-4c3c-b977-86f73d64efbc] PutKafka[id=2b76634c-6354-4c3c-b977-86f73d64efbc] failed to process due to java.lang.NullPointerException; rolling back session: java.lang.NullPointerException
11:18:00 EDT ERROR 2b76634c-6354-4c3c-b977-86f73d64efbc

PutKafka[id=2b76634c-6354-4c3c-b977-86f73d64efbc] PutKafka[id=2b76634c-6354-4c3c-b977-86f73d64efbc] failed to process session due to java.lang.NullPointerException: java.lang.NullPointerException

I'll post all this to another forum & thread, but I will say that when creating something new, and coming across a problem, my first [incorrect] instinct is to troubleshoot my own code... In this case, it was not my code! 🙂

avatar

Paul, I did! And although it was not possible in stock intellij (I couldn't figure out which resource to add to the archetype repo), it DOES work with maven on the command line, then load project in intellij...

My current problem is finding time to do the same thing with ControllerServices... they may end up expensive, but would be threadsafe... REST or other web hosting may end up working, but this will take testing, as there is no way I want to load the Stanford or Google text parsers for every request...

I'll keep everyone abreast of what I discover

avatar
Explorer

Does any one know why or under what circumstances NIFI passes a null flowfile to a custom NIFI processor ? We are seeing cases where the session that is passed in as an input to the onTrigger method of the custom (java) processor contains a session with a null flow file.