Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi custom processor is not generating flowfile

avatar
Rising Star

Hello forum,

I'm trying to create custom Nifi processor to run Java code inside (processor to act as server to generate file once a request is received). All is working fine so far except that processor is not generating any file to the next processor through MY_RELATIONSHIP

Myprocessor code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package hwx.processors.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import jsmpp.SMPPServerSimulator;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.scheduling.SchedulingStrategy;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({
    @ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({
    @WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
    private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY")
            .displayName("SMPP Server port")
            .description("SMPP Server port")
            .required(true)
            .addValidator(StandardValidators.INTEGER_VALIDATOR)
            .build();
    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("SUCCESS_RELATIONSHIP")
            .description("Success relationship")
            .build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);
        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
        System.out.println("This is a custom processor that will receive flow file");
    }
    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }
    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }
    @OnScheduled
    public void onScheduled(final ProcessContext context) {
    }
    @OnStopped
    public void onStopped(final ProcessContext context) {
    }
    public SchedulingStrategy getSchedulingStrategy() {
        return schedulingStrategy.TIMER_DRIVEN;
    }
    @Override
    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
        final int port = context.getProperty(MY_PROPERTY).evaluateAttributeExpressions().asInteger();
        BasicConfigurator.configure();
        SMPPServerSimulator smppServerSim = new SMPPServerSimulator(8058, session, MY_RELATIONSHIP);
        smppServerSim.run();
    }
}
SMPPServerSimulator.java
 public MessageId onAcceptSubmitSm(SubmitSm submitSm,
            SMPPServerSession source) throws ProcessRequestException {
        MessageId messageId = messageIDGenerator.newMessageId();
        logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue());
        if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
            execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId));
        }
        flowfile = session.create();
        flowfile = session.putAttribute(flowfile, "match", new String(submitSm.getShortMessage()));
        System.out.println(session + " flowfile: " + flowfile + " Relation: " + SUCCESS_RELATIONSHIP);
        session.transfer(flowfile, MY_RELATIONSHIP);
        System.out.println("Message Recieved: " + new String(submitSm.getShortMessage()));
        return messageId;
    }

Below Logs below for the System.out.println command. flowfile is created but not sent

p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000; background-color: #ffffff}
span.s1 {font-variant-ligatures: no-common-ligatures}2017-06-18 17:31:46,449 INFO [NiFi logging handler] org.apache.nifi.StdOut StandardProcessSession[id=0] flowfile: StandardFlowFileRecord[uuid=ad3128b6-0032-491e-a5ba-ee73f99b8f0b,claim=,offset=0,name=50727827275499,size=0] Relation: SUCCESS_RELATIONSHIP

can you help please..

1 ACCEPTED SOLUTION

avatar
Master Guru

Is SMPPServerSimulator.run() an asynchronous method, meaning it returns immediately? If so, then the framework may have already committed your session (after returning from the onTrigger() call in AbstractProcessor), in which case the transfer() either didn't work (but it was too late to throw a runtime exception up through to the framework) or it did work but the session now needs another call to commit(). These types of processors that use a separate thread/lifecycle to manage I/O are tricky to integrate into the NiFi architecture, it takes great care to make sure operations are performed in an order that is consistent with the behavior of both the separate entity and the NiFi framework.

If the run() method is synchronous, then there is something else going on, but it still seems related to the session not being committed. Can you attach a debugger and see if sesson.commit() gets called after your session.transfer()?

View solution in original post

4 REPLIES 4

avatar
Master Guru

Is SMPPServerSimulator.run() an asynchronous method, meaning it returns immediately? If so, then the framework may have already committed your session (after returning from the onTrigger() call in AbstractProcessor), in which case the transfer() either didn't work (but it was too late to throw a runtime exception up through to the framework) or it did work but the session now needs another call to commit(). These types of processors that use a separate thread/lifecycle to manage I/O are tricky to integrate into the NiFi architecture, it takes great care to make sure operations are performed in an order that is consistent with the behavior of both the separate entity and the NiFi framework.

If the run() method is synchronous, then there is something else going on, but it still seems related to the session not being committed. Can you attach a debugger and see if sesson.commit() gets called after your session.transfer()?

avatar
Rising Star

Hello Matt,

You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂

what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor !

Thanks a lot

avatar
Rising Star

Hello Matt,

You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂

what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor !

Thanks a lot

avatar
Master Guru

You can annotate a method (with zero or one arguments, the one being a ProcessContext) with @OnStopped that will get called when the processor is stopped. See the Component Lifecycle section of the Developer Guide for more details.