Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

In nifi custom processor is throwing transfer relationship not specified exception

Highlighted

In nifi custom processor is throwing transfer relationship not specified exception

New Contributor

In nifi, I am creating a custom processor which reads multiple row csv data and converts each row into a json and sends.

Below is the custom processor code:

package hwx.processors.demo;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor EXPECTED_JSON = new PropertyDescriptor
            .Builder().name("EXPECTED_JSON")
            .displayName("EXPECTED_JSON")
            .description("EXPECTED_JSON")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    
    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor
            .Builder().name("DELIMITER")
            .displayName("DELIMITER")
            .description("DELIMITER For CSV")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("SUCCESS")
            .description("Example relationship")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(EXPECTED_JSON);
        descriptors.add(DELIMITER);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }
    
    final AtomicReference<String> value = new AtomicReference<>();
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        //String r=context.getProperty("MY_PROPERTY").toString();
        //Pattern pattern = Pattern.compile("\$(\d)+");
        session.read(flowFile, in -> {
            String r="";
            try {
            r= IOUtils.toString(in);
            /*if(r.contains("$"))
            {
                r=r.replaceAll("\$", "\\\$");
            }*/
            }
            catch (Exception e) {
                getLogger().error(e.getMessage() + " exception.", e);
            }
            
            String[] test=r.split(context.getProperty("DELIMITER").toString());
            for(String s1:test){
            try{
            String[] s=s1.replaceAll("\r\n", " ").split(" ");
            String[] s2=s1.split(",");
            Pattern pattern = Pattern.compile("\$(\d)+");
            String text=context.getProperty("EXPECTED_JSON").toString();
            Matcher m = pattern.matcher(text);
            StringBuffer sb = new StringBuffer();
            while (m.find()) {
                if(Integer.parseInt(m.group(1))>s2.length)
                {
                    m.appendReplacement(sb, "");
                }
                else{
                m.appendReplacement(sb, s2[Integer.parseInt(m.group(1)) - 1]);
                }
            }
            m.appendTail(sb);
            value.set(sb.toString());
            sendRecord(session,flowFile);
            } catch (Exception e) {
                getLogger().error(e.getMessage() + " Routing to failure.", e);
            }
            }
        });
        
    }
    
    public void sendRecord(final ProcessSession session,FlowFile flowFile)
    {
        flowFile = session.write(flowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {
                out.write(value.get().getBytes());
            }
        });
        session.transfer(flowFile,SUCCESS);
        session.commit();
        }
}


108567-capture-1.png

Below is the custom processor configuration here in EXPECTED_JSON user will add JSON payload required and custom processor will replace $1 by first position element of csv record,$2 by second like that. So, each row of csv has to be converted as seperate json and send.

EXPECTED_JSON payload:{"test1":"$1","test2":"$2","test3":"$3","test4":"$4"}


108607-capture-2.png Below is the exception from custom processor:

2019-05-10 19:40:40,239 ERROR [Timer-Driven Process Thread-10] hwx.processors.demo.MyProcessor MyProcessor[id=a211cdad-016a-1000-7505-c725dfdbe9da] MyProcessor[id=a211cdad-016a-1000-7505-c725dfdbe9da] failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified; Processor Administratively Yielded for 1 sec: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:271)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:342)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2019-05-10 19:40:40,239 WARN [Timer-Driven Process Thread-10] o.a.n.controller.tasks.ConnectableTask Administratively Yielding MyProcessor[id=a211cdad-016a-1000-7505-c725dfdbe9da] due to uncaught Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:271)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:342)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

So, how to resolve the exception and send each csv record/row as seprate json.