Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Nifi Custom processor

Highlighted

Nifi Custom processor

New Contributor

I am writing my first custom processor. I followed the example here https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/


In the settings, I have /tmp/jsoniput/sample1.json file


When I start the processor, I am getting this error message, any pointers or advice ?


2019-07-13 23:27:19,572 WARN [Timer-Driven Process Thread-7] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=ee6c4800-016b-1000-4a6e-498f0b95e559] sample - Farooq - 2

2019-07-13 23:27:19,588 ERROR [Timer-Driven Process Thread-7] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=ee6c4800-016b-1000-4a6e-498f0b95e559] JsonProcessor[id=ee6c4800-016b-1000-4a6e-498f0b95e559] failed to process session due to java.lang.NullPointerException; Processor Administratively Yielded for 1 sec: java.lang.NullPointerException

java.lang.NullPointerException: null

at org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574)

at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132)

at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187)

at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2180)


Here is the basic flow :109927-nifi-jsonprocessor.png



3 REPLIES 3

Re: Nifi Custom processor

New Contributor

And below is the code for onTrigger:


public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

  final AtomicReference<String> value = new AtomicReference<>();
  getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + ">");

  FlowFile flowfile = session.get();
  getLogger().warn("sample - Farooq - 2");

  if ( null == flowfile) {
    getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** ");
    flowfile = session.create();
    flowfile = session.get();
    getLogger().warn("sample - Farooq - 2.2 - session.create() / session.get() --- NOT SURE ");
  }
  getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + context.getProperty(JSON_PATH).getValue());

  session.read(flowfile, new InputStreamCallback() {
    @Override
    public void process(InputStream in) throws IOException {

      try {
        getLogger().warn("sample - Farooq - 3 JSON_PATH :" + context.getProperty(JSON_PATH).getValue());
        Object obj = new JSONParser().parse(new FileReader("JSON_PATH"));
        getLogger().warn("sample - Farooq - 3.1 ");

Re: Nifi Custom processor

New Contributor

The code for onTrigger:

public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

  final AtomicReference<String> value = new AtomicReference<>();
  getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + ">");

  FlowFile flowfile = session.get();
  getLogger().warn("sample - Farooq - 2");

  if ( null == flowfile) {
    getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** ");
    flowfile = session.create();
    flowfile = session.get();
    getLogger().warn("sample - Farooq - 2.2 - session.create() / session.get() --- NOT SURE ");
  }
  getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + context.getProperty(JSON_PATH).getValue());

  session.read(flowfile, new InputStreamCallback() {
    @Override
    public void process(InputStream in) throws IOException {

      try {
        getLogger().warn("sample - Farooq - 3 JSON_PATH :" + context.getProperty(JSON_PATH).getValue());
        Object obj = new JSONParser().parse(new FileReader("JSON_PATH"));
        getLogger().warn("sample - Farooq - 3.1 ");


The log output:

2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 1 , value = <null>

2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2

2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2.1 - flowfile is NULL ***

2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2.2 - session.create() / session.get() --- NOT SURE

2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2.3 JSON_PATH :/tmp/jsoninput

2019-07-20 13:58:42,626 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.sitc.JsonProcessor JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] failed to process session due to java.lang.NullPointerException; Processor Administratively Yielded for 1 sec: java.lang.NullPointerException

java.lang.NullPointerException: null

at org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574)

at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132)

at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187)


Re: Nifi Custom processor

New Contributor

You specify the string "JSON_PATH" as the path, which cannot be resolved and thus leads to a nullpointer.

 

Should be like:

String jsonPath = context.getProperty(JSON_PATH).getValue();
 Object obj = new JSONParser().parse(new FileReader(jsonPath));