Support Questions

Find answers, ask questions, and share your expertise

Custom processor - This flowfile was not created in this session and was not transferred to any relationship via ProcessSession.transfer()

Explorer

I have a custom processor which takes in a flowfile and based on the given attributes it creates new attributes and returns. However, I keep getting this error and I've checked the forums and I can't see where the problem is. Any help is appreciated. 

 

o.a.n.controller.tasks.ConnectableTask Processing halted: uncaught exception in Component [LocationParserProcessor[id=630e7e24-0183-1000-905c-8f31dc8a1e22]]
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=82eb8ad5-a9e3-473a-991b-0c4a3ab2bf36,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1663856909589-1, container=default, section=1], offset=698, length=188],offset=0,name=file.csv,size=96] transfer relationship not specified. This FlowFile was not created in this session and was not transferred to any Relationship via ProcessSession.transfer()
	at org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:259)
	at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:274)
	at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:556)
	at org.apache.nifi.controller.repository.StandardProcessSession.commitAsync(StandardProcessSession.java:510)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

 

 

 

 

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class LocationParserProcessor extends AbstractProcessor {

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFile attributes updated")
            .build();
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("FlowFile attributes not updated")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        try {
            String att1 = flowFile.getAttribute("att1");
            String att_list = flowFile.getAttribute("att_list");
            String[] atts = att_list.split(",");
            String all_atts="";
            for(int i=0;i<atts.length;i++){
                all_atts=all_atts+" "+flowFile.getAttribute(atts[i]);
            }
            
            List<String> result = Conversion.convert(all_atts,atts);
            HashMap<String, String> attributes = new HashMap<>();
            attributes.put("result_1",result.get(0);
            attributes.put("result_2",result.get(1));
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);
            return;
        }
        catch (Exception e) {
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        
    }
}

 

 

0 REPLIES 0