Support Questions

Find answers, ask questions, and share your expertise

nifi custom processor - onTrigger

avatar
Explorer

Hello,

I created custom processor, added loging for onTrigger, init, onScheduled methods.

But onTrigger never called. When I press Run in Nifi GUI, I see onScheduled method called, but not onTrigger.

Thank you for your help.

My code example:

@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)



	public static final PropertyDescriptor SCAN_PORT = new PropertyDescriptor.Builder().name("Port")
			.description("Scan port").defaultValue("8080").required(true)
			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();


	public static final Relationship REL_FOUND = new Relationship.Builder().name("found")
			.description("Found host with given port").build();


	private List<PropertyDescriptor> descriptors;


	private Set<Relationship> relationships;


	private static HostScanner hostScanner;


	@Override
	protected void init(final ProcessorInitializationContext context) {
		final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
		descriptors.add(SCAN_PORT);
		this.descriptors = Collections.unmodifiableList(descriptors);


		final Set<Relationship> relationships = new HashSet<Relationship>();
		relationships.add(REL_FOUND);
		this.relationships = Collections.unmodifiableSet(relationships);
		this.getLogger().info("[INFO] HostDiscoveryProcessor init");


		hostScanner = new HostScanner();
		hostScanner.setLogger(getLogger());
	}


	@Override
	public Set<Relationship> getRelationships() {
		return this.relationships;
	}


	@Override
	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
		return descriptors;
	}


	@OnScheduled
	public void onScheduled(final ProcessContext context) {
		this.getLogger().info("[INFO] HostDiscoveryProcessor onScheduled");
		final String scanPort = context.getProperty(SCAN_PORT).getValue();
		hostScanner.setPort(Integer.parseInt(scanPort));


	}


	@Override
	public void onTrigger(final ProcessContext context, final ProcessSession aSession) throws ProcessException {
		FlowFile flowFile = aSession.get();
		if (flowFile == null) {
			return;
		}
		// Logger logger = Logger.getLogger(MyClass.class.getName());
		this.getLogger().info("[INFO] HostDiscoveryProcessor onTrigger - start scan");
		List<String> ips = hostScanner.scan();
		this.getLogger().info("[INFO] Scan completed");
		
		flowFile = aSession.write(flowFile, new OutputStreamCallback() {			
			@Override
			public void process(OutputStream aStream) throws IOException {
				 aStream.write(listToBytes(ips));
				
			}
		});
		aSession.transfer(flowFile, REL_FOUND);
		aSession.commit();
	}
1 ACCEPTED SOLUTION

avatar
Master Guru

@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:

FlowFile flowFile = aSession.get();
if(flowFile ==null){
return;
}

That will always return and exit your onTrigger because there are no flow files.

Instead you will want to create a new flow file like:

FlowFile flowFile = session.create();

View solution in original post

2 REPLIES 2

avatar
Master Guru

@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:

FlowFile flowFile = aSession.get();
if(flowFile ==null){
return;
}

That will always return and exit your onTrigger because there are no flow files.

Instead you will want to create a new flow file like:

FlowFile flowFile = session.create();

avatar
Explorer

Thank you very much!