package inn.custom.processor; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"mongo"}) @CapabilityDescription("Read data from mongo") public class NodeReadFromMongo extends AbstractProcessor { static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("databaseName") .description("DataBaseName") .required(true) .expressionLanguageSupported(true) .build(); static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("collectionName") .description("CollectionName") .required(true) .expressionLanguageSupported(true) .build(); static final PropertyDescriptor COLUMN_NAME = new PropertyDescriptor.Builder() .name("columnName") .description("columnName") .required(true) .expressionLanguageSupported(true) .build(); static final PropertyDescriptor START_TIME = new PropertyDescriptor.Builder() .name("startTime") .description("startTime") .required(true) .expressionLanguageSupported(true) .build(); static final PropertyDescriptor END_TIME = new PropertyDescriptor.Builder() .name("endTime") .description("endTime") .required(true) .expressionLanguageSupported(true) .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All successful fetches are routed to this relationship.") .build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("All failed fetches are routed to this relationship.") .build(); static final List properties; static final Set relationships; static { List props = new ArrayList(); props.add(DATABASE_NAME); props.add(COLLECTION_NAME); props.add(COLUMN_NAME); props.add(START_TIME); props.add(END_TIME); properties = Collections.unmodifiableList(props); Set rels = new HashSet(); rels.add(REL_SUCCESS); rels.add(REL_FAILURE); relationships = Collections.unmodifiableSet(rels); } protected List getSupportedPropertyDescriptors() { return properties; } public Set getRelationships() { return relationships; } @OnScheduled public void onScheduled(ProcessContext context) { } public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { System.out.println("On trigger method "); FlowFile flowFile = session.get(); if (flowFile == null) { return; } } }