<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Nifi custom processor is not generating flowfile in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212235#M63163</link>
    <description>&lt;P&gt;
	Hello forum,
&lt;/P&gt;
&lt;P&gt;
	I'm trying to create custom Nifi processor to run Java code inside (processor to act as server to generate file once a request is received). All is working fine so far except that  processor is not generating any file to the next processor through MY_RELATIONSHIP
&lt;/P&gt;

&lt;P&gt;
	Myprocessor code
&lt;/P&gt;
&lt;PRE&gt;
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     &lt;A href="http://www.apache.org/licenses/LICENSE-2.0" target="_blank"&gt;http://www.apache.org/licenses/LICENSE-2.0&lt;/A&gt;
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package hwx.processors.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import jsmpp.SMPPServerSimulator;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.scheduling.SchedulingStrategy;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({
    @ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({
    @WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
    private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY")
            .displayName("SMPP Server port")
            .description("SMPP Server port")
            .required(true)
            .addValidator(StandardValidators.INTEGER_VALIDATOR)
            .build();
    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("SUCCESS_RELATIONSHIP")
            .description("Success relationship")
            .build();
    private List&amp;lt;PropertyDescriptor&amp;gt; descriptors;
    private Set&amp;lt;Relationship&amp;gt; relationships;
    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List&amp;lt;PropertyDescriptor&amp;gt; descriptors = new ArrayList&amp;lt;PropertyDescriptor&amp;gt;();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);
        final Set&amp;lt;Relationship&amp;gt; relationships = new HashSet&amp;lt;Relationship&amp;gt;();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
        System.out.println("This is a custom processor that will receive flow file");
    }
    @Override
    public Set&amp;lt;Relationship&amp;gt; getRelationships() {
        return this.relationships;
    }
    @Override
    public final List&amp;lt;PropertyDescriptor&amp;gt; getSupportedPropertyDescriptors() {
        return descriptors;
    }
    @OnScheduled
    public void onScheduled(final ProcessContext context) {
    }
    @OnStopped
    public void onStopped(final ProcessContext context) {
    }
    public SchedulingStrategy getSchedulingStrategy() {
        return schedulingStrategy.TIMER_DRIVEN;
    }
    @Override
    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
        final int port = context.getProperty(MY_PROPERTY).evaluateAttributeExpressions().asInteger();
        BasicConfigurator.configure();
        SMPPServerSimulator smppServerSim = new SMPPServerSimulator(8058, session, MY_RELATIONSHIP);
        smppServerSim.run();
    }
}
&lt;/PRE&gt;

SMPPServerSimulator.java

&lt;PRE&gt;
 public MessageId onAcceptSubmitSm(SubmitSm submitSm,
            SMPPServerSession source) throws ProcessRequestException {
        MessageId messageId = messageIDGenerator.newMessageId();
        logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue());
        if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
            execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId));
        }
        flowfile = session.create();
        flowfile = session.putAttribute(flowfile, "match", new String(submitSm.getShortMessage()));
        System.out.println(session + " flowfile: " + flowfile + " Relation: " + SUCCESS_RELATIONSHIP);
        session.transfer(flowfile, MY_RELATIONSHIP);
        System.out.println("Message Recieved: " + new String(submitSm.getShortMessage()));
        return messageId;
    }
&lt;/PRE&gt;
&lt;P&gt;
	Below Logs below for the System.out.println command. flowfile is created but not sent
&lt;/P&gt;
&lt;PRE&gt;
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000; background-color: #ffffff}
span.s1 {font-variant-ligatures: no-common-ligatures}2017-06-18 17:31:46,449 INFO [NiFi logging handler] org.apache.nifi.StdOut StandardProcessSession[id=0] flowfile: StandardFlowFileRecord[uuid=ad3128b6-0032-491e-a5ba-ee73f99b8f0b,claim=,offset=0,name=50727827275499,size=0] Relation: SUCCESS_RELATIONSHIP
&lt;/PRE&gt;

&lt;P&gt;
	can you help please..
&lt;/P&gt;</description>
    <pubDate>Mon, 19 Jun 2017 07:21:50 GMT</pubDate>
    <dc:creator>yahya_najjar</dc:creator>
    <dc:date>2017-06-19T07:21:50Z</dc:date>
    <item>
      <title>Nifi custom processor is not generating flowfile</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212235#M63163</link>
      <description>&lt;P&gt;
	Hello forum,
&lt;/P&gt;
&lt;P&gt;
	I'm trying to create custom Nifi processor to run Java code inside (processor to act as server to generate file once a request is received). All is working fine so far except that  processor is not generating any file to the next processor through MY_RELATIONSHIP
&lt;/P&gt;

&lt;P&gt;
	Myprocessor code
&lt;/P&gt;
&lt;PRE&gt;
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     &lt;A href="http://www.apache.org/licenses/LICENSE-2.0" target="_blank"&gt;http://www.apache.org/licenses/LICENSE-2.0&lt;/A&gt;
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package hwx.processors.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import jsmpp.SMPPServerSimulator;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.scheduling.SchedulingStrategy;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({
    @ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({
    @WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
    private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY")
            .displayName("SMPP Server port")
            .description("SMPP Server port")
            .required(true)
            .addValidator(StandardValidators.INTEGER_VALIDATOR)
            .build();
    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("SUCCESS_RELATIONSHIP")
            .description("Success relationship")
            .build();
    private List&amp;lt;PropertyDescriptor&amp;gt; descriptors;
    private Set&amp;lt;Relationship&amp;gt; relationships;
    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List&amp;lt;PropertyDescriptor&amp;gt; descriptors = new ArrayList&amp;lt;PropertyDescriptor&amp;gt;();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);
        final Set&amp;lt;Relationship&amp;gt; relationships = new HashSet&amp;lt;Relationship&amp;gt;();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
        System.out.println("This is a custom processor that will receive flow file");
    }
    @Override
    public Set&amp;lt;Relationship&amp;gt; getRelationships() {
        return this.relationships;
    }
    @Override
    public final List&amp;lt;PropertyDescriptor&amp;gt; getSupportedPropertyDescriptors() {
        return descriptors;
    }
    @OnScheduled
    public void onScheduled(final ProcessContext context) {
    }
    @OnStopped
    public void onStopped(final ProcessContext context) {
    }
    public SchedulingStrategy getSchedulingStrategy() {
        return schedulingStrategy.TIMER_DRIVEN;
    }
    @Override
    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
        final int port = context.getProperty(MY_PROPERTY).evaluateAttributeExpressions().asInteger();
        BasicConfigurator.configure();
        SMPPServerSimulator smppServerSim = new SMPPServerSimulator(8058, session, MY_RELATIONSHIP);
        smppServerSim.run();
    }
}
&lt;/PRE&gt;

SMPPServerSimulator.java

&lt;PRE&gt;
 public MessageId onAcceptSubmitSm(SubmitSm submitSm,
            SMPPServerSession source) throws ProcessRequestException {
        MessageId messageId = messageIDGenerator.newMessageId();
        logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue());
        if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
            execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId));
        }
        flowfile = session.create();
        flowfile = session.putAttribute(flowfile, "match", new String(submitSm.getShortMessage()));
        System.out.println(session + " flowfile: " + flowfile + " Relation: " + SUCCESS_RELATIONSHIP);
        session.transfer(flowfile, MY_RELATIONSHIP);
        System.out.println("Message Recieved: " + new String(submitSm.getShortMessage()));
        return messageId;
    }
&lt;/PRE&gt;
&lt;P&gt;
	Below Logs below for the System.out.println command. flowfile is created but not sent
&lt;/P&gt;
&lt;PRE&gt;
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000; background-color: #ffffff}
span.s1 {font-variant-ligatures: no-common-ligatures}2017-06-18 17:31:46,449 INFO [NiFi logging handler] org.apache.nifi.StdOut StandardProcessSession[id=0] flowfile: StandardFlowFileRecord[uuid=ad3128b6-0032-491e-a5ba-ee73f99b8f0b,claim=,offset=0,name=50727827275499,size=0] Relation: SUCCESS_RELATIONSHIP
&lt;/PRE&gt;

&lt;P&gt;
	can you help please..
&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2017 07:21:50 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212235#M63163</guid>
      <dc:creator>yahya_najjar</dc:creator>
      <dc:date>2017-06-19T07:21:50Z</dc:date>
    </item>
    <item>
      <title>Re: Nifi custom processor is not generating flowfile</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212236#M63164</link>
      <description>&lt;P&gt;Is SMPPServerSimulator.run() an asynchronous method, meaning it returns immediately? If so, then the framework may have already committed your session (after returning from the onTrigger() call in AbstractProcessor), in which case the transfer() either didn't work (but it was too late to throw a runtime exception up through to the framework) or it did work but the session now needs another call to commit(). These types of processors that use a separate thread/lifecycle to manage I/O are tricky to integrate into the NiFi architecture, it takes great care to make sure operations are performed in an order that is consistent with the behavior of both the separate entity and the NiFi framework.&lt;/P&gt;&lt;P&gt;If the run() method is synchronous, then there is something else going on, but it still seems related to the session not being committed. Can you attach a debugger and see if sesson.commit() gets called after your session.transfer()?&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2017 08:03:49 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212236#M63164</guid>
      <dc:creator>mburgess</dc:creator>
      <dc:date>2017-06-19T08:03:49Z</dc:date>
    </item>
    <item>
      <title>Re: Nifi custom processor is not generating flowfile</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212237#M63165</link>
      <description>&lt;P&gt;Hello Matt,&lt;/P&gt;&lt;P&gt;You are right, I added "sesson.commit()" and now files are generated. Thanks &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;&lt;P&gt;what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor !&lt;/P&gt;&lt;P&gt;Thanks a lot&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2017 22:25:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212237#M63165</guid>
      <dc:creator>yahya_najjar</dc:creator>
      <dc:date>2017-06-19T22:25:55Z</dc:date>
    </item>
    <item>
      <title>Re: Nifi custom processor is not generating flowfile</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212238#M63166</link>
      <description>&lt;P&gt;Hello Matt,&lt;/P&gt;&lt;P&gt;You are right, I added "sesson.commit()" and now files are generated. Thanks &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;&lt;P&gt;what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor !&lt;/P&gt;&lt;P&gt;Thanks a lot&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2017 22:26:16 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212238#M63166</guid>
      <dc:creator>yahya_najjar</dc:creator>
      <dc:date>2017-06-19T22:26:16Z</dc:date>
    </item>
    <item>
      <title>Re: Nifi custom processor is not generating flowfile</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212239#M63167</link>
      <description>&lt;P&gt;You can annotate a method (with zero or one arguments, the one being a ProcessContext) with &lt;A target="_blank" href="https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#onstopped"&gt;@OnStopped&lt;/A&gt; that will get called when the processor is stopped. See the Component Lifecycle section of the Developer Guide for more details.&lt;/P&gt;</description>
      <pubDate>Tue, 20 Jun 2017 01:24:53 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Nifi-custom-processor-is-not-generating-flowfile/m-p/212239#M63167</guid>
      <dc:creator>mburgess</dc:creator>
      <dc:date>2017-06-20T01:24:53Z</dc:date>
    </item>
  </channel>
</rss>

