Created 06-19-2017 12:21 AM
Hello forum,
I'm trying to create custom Nifi processor to run Java code inside (processor to act as server to generate file once a request is received). All is working fine so far except that processor is not generating any file to the next processor through MY_RELATIONSHIP
Myprocessor code
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package hwx.processors.demo; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import jsmpp.SMPPServerSimulator; import org.apache.log4j.BasicConfigurator; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.scheduling.SchedulingStrategy; @Tags({"example"}) @CapabilityDescription("Provide a description") @SeeAlso({}) @ReadsAttributes({ @ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({ @WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { private SchedulingStrategy schedulingStrategy; // guarded by read/write lock public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY") .displayName("SMPP Server port") .description("SMPP Server port") .required(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); public static final Relationship MY_RELATIONSHIP = new Relationship.Builder() .name("SUCCESS_RELATIONSHIP") .description("Success relationship") .build(); private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(MY_PROPERTY); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(MY_RELATIONSHIP); this.relationships = Collections.unmodifiableSet(relationships); System.out.println("This is a custom processor that will receive flow file"); } @Override public Set<Relationship> getRelationships() { return this.relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptors; } @OnScheduled public void onScheduled(final ProcessContext context) { } @OnStopped public void onStopped(final ProcessContext context) { } public SchedulingStrategy getSchedulingStrategy() { return schedulingStrategy.TIMER_DRIVEN; } @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { final int port = context.getProperty(MY_PROPERTY).evaluateAttributeExpressions().asInteger(); BasicConfigurator.configure(); SMPPServerSimulator smppServerSim = new SMPPServerSimulator(8058, session, MY_RELATIONSHIP); smppServerSim.run(); } }SMPPServerSimulator.java
public MessageId onAcceptSubmitSm(SubmitSm submitSm, SMPPServerSession source) throws ProcessRequestException { MessageId messageId = messageIDGenerator.newMessageId(); logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue()); if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) { execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId)); } flowfile = session.create(); flowfile = session.putAttribute(flowfile, "match", new String(submitSm.getShortMessage())); System.out.println(session + " flowfile: " + flowfile + " Relation: " + SUCCESS_RELATIONSHIP); session.transfer(flowfile, MY_RELATIONSHIP); System.out.println("Message Recieved: " + new String(submitSm.getShortMessage())); return messageId; }
Below Logs below for the System.out.println command. flowfile is created but not sent
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000; background-color: #ffffff} span.s1 {font-variant-ligatures: no-common-ligatures}2017-06-18 17:31:46,449 INFO [NiFi logging handler] org.apache.nifi.StdOut StandardProcessSession[id=0] flowfile: StandardFlowFileRecord[uuid=ad3128b6-0032-491e-a5ba-ee73f99b8f0b,claim=,offset=0,name=50727827275499,size=0] Relation: SUCCESS_RELATIONSHIP
can you help please..
Created 06-19-2017 01:03 AM
Is SMPPServerSimulator.run() an asynchronous method, meaning it returns immediately? If so, then the framework may have already committed your session (after returning from the onTrigger() call in AbstractProcessor), in which case the transfer() either didn't work (but it was too late to throw a runtime exception up through to the framework) or it did work but the session now needs another call to commit(). These types of processors that use a separate thread/lifecycle to manage I/O are tricky to integrate into the NiFi architecture, it takes great care to make sure operations are performed in an order that is consistent with the behavior of both the separate entity and the NiFi framework.
If the run() method is synchronous, then there is something else going on, but it still seems related to the session not being committed. Can you attach a debugger and see if sesson.commit() gets called after your session.transfer()?
Created 06-19-2017 01:03 AM
Is SMPPServerSimulator.run() an asynchronous method, meaning it returns immediately? If so, then the framework may have already committed your session (after returning from the onTrigger() call in AbstractProcessor), in which case the transfer() either didn't work (but it was too late to throw a runtime exception up through to the framework) or it did work but the session now needs another call to commit(). These types of processors that use a separate thread/lifecycle to manage I/O are tricky to integrate into the NiFi architecture, it takes great care to make sure operations are performed in an order that is consistent with the behavior of both the separate entity and the NiFi framework.
If the run() method is synchronous, then there is something else going on, but it still seems related to the session not being committed. Can you attach a debugger and see if sesson.commit() gets called after your session.transfer()?
Created 06-19-2017 03:26 PM
Hello Matt,
You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂
what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor !
Thanks a lot
Created 06-19-2017 03:25 PM
Hello Matt,
You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂
what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor !
Thanks a lot
Created 06-19-2017 06:24 PM
You can annotate a method (with zero or one arguments, the one being a ProcessContext) with @OnStopped that will get called when the processor is stopped. See the Component Lifecycle section of the Developer Guide for more details.