Created 04-27-2017 07:40 PM
Hello,
I created custom processor, added loging for onTrigger, init, onScheduled methods.
But onTrigger never called. When I press Run in Nifi GUI, I see onScheduled method called, but not onTrigger.
Thank you for your help.
My code example:
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
public static final PropertyDescriptor SCAN_PORT = new PropertyDescriptor.Builder().name("Port")
.description("Scan port").defaultValue("8080").required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final Relationship REL_FOUND = new Relationship.Builder().name("found")
.description("Found host with given port").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private static HostScanner hostScanner;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(SCAN_PORT);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_FOUND);
this.relationships = Collections.unmodifiableSet(relationships);
this.getLogger().info("[INFO] HostDiscoveryProcessor init");
hostScanner = new HostScanner();
hostScanner.setLogger(getLogger());
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
this.getLogger().info("[INFO] HostDiscoveryProcessor onScheduled");
final String scanPort = context.getProperty(SCAN_PORT).getValue();
hostScanner.setPort(Integer.parseInt(scanPort));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession aSession) throws ProcessException {
FlowFile flowFile = aSession.get();
if (flowFile == null) {
return;
}
// Logger logger = Logger.getLogger(MyClass.class.getName());
this.getLogger().info("[INFO] HostDiscoveryProcessor onTrigger - start scan");
List<String> ips = hostScanner.scan();
this.getLogger().info("[INFO] Scan completed");
flowFile = aSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream aStream) throws IOException {
aStream.write(listToBytes(ips));
}
});
aSession.transfer(flowFile, REL_FOUND);
aSession.commit();
}
Created 04-27-2017 09:04 PM
@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:
FlowFile flowFile = aSession.get();
if(flowFile ==null){
return;
}That will always return and exit your onTrigger because there are no flow files.
Instead you will want to create a new flow file like:
FlowFile flowFile = session.create();
Created 04-27-2017 09:04 PM
@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:
FlowFile flowFile = aSession.get();
if(flowFile ==null){
return;
}That will always return and exit your onTrigger because there are no flow files.
Instead you will want to create a new flow file like:
FlowFile flowFile = session.create();
Created 04-28-2017 07:55 AM
Thank you very much!