Created 04-27-2017 07:40 PM
Hello,
I created custom processor, added loging for onTrigger, init, onScheduled methods.
But onTrigger never called. When I press Run in Nifi GUI, I see onScheduled method called, but not onTrigger.
Thank you for your help.
My code example:
@TriggerSerially @InputRequirement(Requirement.INPUT_FORBIDDEN) public static final PropertyDescriptor SCAN_PORT = new PropertyDescriptor.Builder().name("Port") .description("Scan port").defaultValue("8080").required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); public static final Relationship REL_FOUND = new Relationship.Builder().name("found") .description("Found host with given port").build(); private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; private static HostScanner hostScanner; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(SCAN_PORT); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(REL_FOUND); this.relationships = Collections.unmodifiableSet(relationships); this.getLogger().info("[INFO] HostDiscoveryProcessor init"); hostScanner = new HostScanner(); hostScanner.setLogger(getLogger()); } @Override public Set<Relationship> getRelationships() { return this.relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptors; } @OnScheduled public void onScheduled(final ProcessContext context) { this.getLogger().info("[INFO] HostDiscoveryProcessor onScheduled"); final String scanPort = context.getProperty(SCAN_PORT).getValue(); hostScanner.setPort(Integer.parseInt(scanPort)); } @Override public void onTrigger(final ProcessContext context, final ProcessSession aSession) throws ProcessException { FlowFile flowFile = aSession.get(); if (flowFile == null) { return; } // Logger logger = Logger.getLogger(MyClass.class.getName()); this.getLogger().info("[INFO] HostDiscoveryProcessor onTrigger - start scan"); List<String> ips = hostScanner.scan(); this.getLogger().info("[INFO] Scan completed"); flowFile = aSession.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream aStream) throws IOException { aStream.write(listToBytes(ips)); } }); aSession.transfer(flowFile, REL_FOUND); aSession.commit(); }
Created 04-27-2017 09:04 PM
@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:
FlowFile flowFile = aSession.get(); if(flowFile ==null){ return; }
That will always return and exit your onTrigger because there are no flow files.
Instead you will want to create a new flow file like:
FlowFile flowFile = session.create();
Created 04-27-2017 09:04 PM
@Gu Gur Since your processor has INPUT_FORBIDDEN, you can never have flow files coming in to your processor, so you should remove the beginning of onTrigger where you have:
FlowFile flowFile = aSession.get(); if(flowFile ==null){ return; }
That will always return and exit your onTrigger because there are no flow files.
Instead you will want to create a new flow file like:
FlowFile flowFile = session.create();
Created 04-28-2017 07:55 AM
Thank you very much!