Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

nifi custom processor - onTrigger

avatar
New Member

Hello,

I created custom processor, added loging for onTrigger, init, onScheduled methods.

But onTrigger never called. When I press Run in Nifi GUI, I see onScheduled method called, but not onTrigger.

Thank you for your help.

My code example:

@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)



	public static final PropertyDescriptor SCAN_PORT = new PropertyDescriptor.Builder().name("Port")
			.description("Scan port").defaultValue("8080").required(true)
			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();


	public static final Relationship REL_FOUND = new Relationship.Builder().name("found")
			.description("Found host with given port").build();


	private List<PropertyDescriptor> descriptors;


	private Set<Relationship> relationships;


	private static HostScanner hostScanner;


	@Override
	protected void init(final ProcessorInitializationContext context) {
		final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
		descriptors.add(SCAN_PORT);
		this.descriptors = Collections.unmodifiableList(descriptors);


		final Set<Relationship> relationships = new HashSet<Relationship>();
		relationships.add(REL_FOUND);
		this.relationships = Collections.unmodifiableSet(relationships);
		this.getLogger().info("[INFO] HostDiscoveryProcessor init");


		hostScanner = new HostScanner();
		hostScanner.setLogger(getLogger());
	}


	@Override
	public Set<Relationship> getRelationships() {
		return this.relationships;
	}


	@Override
	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
		return descriptors;
	}


	@OnScheduled
	public void onScheduled(final ProcessContext context) {
		this.getLogger().info("[INFO] HostDiscoveryProcessor onScheduled");
		final String scanPort = context.getProperty(SCAN_PORT).getValue();
		hostScanner.setPort(Integer.parseInt(scanPort));


	}


	@Override
	public void onTrigger(final ProcessContext context, final ProcessSession aSession) throws ProcessException {
		FlowFile flowFile = aSession.get();
		if (flowFile == null) {
			return;
		}
		// Logger logger = Logger.getLogger(MyClass.class.getName());
		this.getLogger().info("[INFO] HostDiscoveryProcessor onTrigger - start scan");
		List<String> ips = hostScanner.scan();
		this.getLogger().info("[INFO] Scan completed");
		
		flowFile = aSession.write(flowFile, new OutputStreamCallback() {			
			@Override
			public void process(OutputStream aStream) throws IOException {
				 aStream.write(listToBytes(ips));
				
			}
		});
		aSession.transfer(flowFile, REL_FOUND);
		aSession.commit();
	}
1 ACCEPTED SOLUTION

avatar
Master Guru

@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:

FlowFile flowFile = aSession.get();
if(flowFile ==null){
return;
}

That will always return and exit your onTrigger because there are no flow files.

Instead you will want to create a new flow file like:

FlowFile flowFile = session.create();

View solution in original post

2 REPLIES 2

avatar
Master Guru

@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:

FlowFile flowFile = aSession.get();
if(flowFile ==null){
return;
}

That will always return and exit your onTrigger because there are no flow files.

Instead you will want to create a new flow file like:

FlowFile flowFile = session.create();

avatar
New Member

Thank you very much!