Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[etc] transfer relationship not specified

failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[etc] transfer relationship not specified

New Contributor

Hi there,
I have a custom processor written in Java. The happy path is working fine and flow file is transferred to Success relationship, but if an exception occurs, the flowfile stays in the queue due to "failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[etc] transfer relationship not specified"

Please can you advise me what is wrong in this code? how I shall handle exception in Nifi? I want the flowfile to be sent to failure relationship for any exception.

 

    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
            .description("Success relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
            .description("Failure relationship").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
            .description("Original relationship").build();


@Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        getLogger().debug("onTrigger");
        FlowFile originalFlowfile = session.get();
        if (originalFlowfile == null) {
            return;
        }
        FlowFile flowfile = originalFlowfile;
        Map<String, String> attributes = flowfile.getAttributes();
        FlowFile copy = null;
        try {

            copy = session.clone(flowfile);

            getLogger().debug("processing flow file input stream");

            flowfile = session.write(flowfile, new StreamCallback() {
                @Override
                public void process(InputStream inputStream, OutputStream out) throws IOException {
                    ByteArrayOutputStream fos = null;
                    BufferedInputStream bis = null;
                    try {
                        bis = new BufferedInputStream(inputStream);
                        fos = processFile(bis); //if exception is thrown while executing processFile, I get the exception in the description
                        out.write(fos.toByteArray());
                    } finally {
                        if (fos != null) {
                            getLogger().debug("closing ByteArrayOutputStream");
                            fos.close();
                        }
                        if (bis != null) {
                            getLogger().info("closing BufferedInputStream");
                            bis.close();
                        }
                    }
                }
            });


            getLogger().info("File transformation completed successfuly");
           
            // set flowfile into session
            session.transfer(flowfile, REL_SUCCESS);
            if (copy != null) {
                // post original file
                copy = session.putAttribute(copy, FILE_TYPE, ORIGINAL_FILE);
                // To write the original file
                session.transfer(copy, REL_ORIGINAL);
            }
        } catch (AssertionError | Exception ex) {
            getLogger().error("Error occurred during processing flow File. ", ex);
            session.transfer(flowfile, REL_FAILURE);
        }
}

 

2019-09-25 07:55:50,251 ERROR [Timer-Driven Process Thread-4] i.q.n.m.csv.processor.CsvMasterProcessor CsvMasterProcessor[id=e2be576d-016b-1000-6fce-444f34403645] CsvMasterProcessor[id=e2be576d-016b-1000-6fce-444f34403645] failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=73a2283a-44c5-468d-adb8-cf4c3887ffaa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1569393730679-1, container=default, section=1], offset=0, length=1200755],offset=0,name=e55ab3f4-3645-4493-aac4-4e63e754653f,size=1200755] transfer relationship not specified; Processor Administratively Yielded for 1 sec: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=73a2283a-44c5-468d-adb8-cf4c3887ffaa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1569393730679-1, container=default, section=1], offset=0, length=1200755],offset=0,name=e55ab3f4-3645-4493-aac4-4e63e754653f,size=1200755] transfer relationship not specified
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=73a2283a-44c5-468d-adb8-cf4c3887ffaa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1569393730679-1, container=default, section=1], offset=0, length=1200755],offset=0,name=e55ab3f4-3645-4493-aac4-4e63e754653f,size=1200755] transfer relationship not specified
at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:270)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:341)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) 

Don't have an account?
Coming from Hortonworks? Activate your account here