Created 09-22-2022 07:55 AM
I have a custom processor which takes in a flowfile and based on the given attributes it creates new attributes and returns. However, I keep getting this error and I've checked the forums and I can't see where the problem is. Any help is appreciated.
o.a.n.controller.tasks.ConnectableTask Processing halted: uncaught exception in Component [LocationParserProcessor[id=630e7e24-0183-1000-905c-8f31dc8a1e22]]
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=82eb8ad5-a9e3-473a-991b-0c4a3ab2bf36,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1663856909589-1, container=default, section=1], offset=698, length=188],offset=0,name=file.csv,size=96] transfer relationship not specified. This FlowFile was not created in this session and was not transferred to any Relationship via ProcessSession.transfer()
at org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:259)
at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:274)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:556)
at org.apache.nifi.controller.repository.StandardProcessSession.commitAsync(StandardProcessSession.java:510)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class LocationParserProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile attributes updated")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile attributes not updated")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
try {
String att1 = flowFile.getAttribute("att1");
String att_list = flowFile.getAttribute("att_list");
String[] atts = att_list.split(",");
String all_atts="";
for(int i=0;i<atts.length;i++){
all_atts=all_atts+" "+flowFile.getAttribute(atts[i]);
}
List<String> result = Conversion.convert(all_atts,atts);
HashMap<String, String> attributes = new HashMap<>();
attributes.put("result_1",result.get(0);
attributes.put("result_2",result.get(1));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
return;
}
catch (Exception e) {
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}
Created 03-22-2023 08:45 AM
@Jacccs Did you solve your problem? I have a similar case