Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to create a flow files and add attributes in a custom processor

avatar

 

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.demo.processors;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.InvokeHTTP;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Tags({"GetMember"})
@CapabilityDescription("Get a Member details from member collection")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
public class GetMemberProcessor extends InvokeHTTP {

    public static final PropertyDescriptor ID = new PropertyDescriptor
            .Builder().name("ID")
            .displayName("Id")
            .description("Member Id")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    
    private static final String CUSTOM_URL = "https://my-url/api/v1";
    private static final String GET_MEMBER_PATH = "/members";

    private List<PropertyDescriptor> descriptors;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        super.init(context);
        List<PropertyDescriptor> invokeDescriptors = super.getSupportedPropertyDescriptors();
        descriptors = new ArrayList<>();
        if (invokeDescriptors != null) {
            for (PropertyDescriptor descriptor : invokeDescriptors) {
                if (descriptor.getName().equalsIgnoreCase("HTTP URL")) {
                    continue;
                }
                descriptors.add(descriptor);
            }
        }

        PropertyDescriptor HTTP_URL = new PropertyDescriptor.Builder()
                .name("HTTP URL")
                .description("Get Members URL")
                .required(false)
                .defaultValue(CUSTOM_URL + GET_MEMBER_PATH + "/${MEMBER_ID}")
                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
                .addValidator(StandardValidators.URL_VALIDATOR)
                .build();


        descriptors.add(ID);
        descriptors.add(HTTP_URL);
        descriptors = Collections.unmodifiableList(descriptors);
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        String memberId = StringUtils.trimToEmpty(context.getProperty("ID").evaluateAttributeExpressions().getValue());
        String uri = StringUtils.trimToEmpty(context.getProperty("HTTP URL").evaluateAttributeExpressions().getValue());
        getLogger().info("ID: " + memberId + "  URI Found: " + uri);
        FlowFile requestFlowFile = session.get();
        if (requestFlowFile == null) {
            requestFlowFile = session.create();
        }
        session.putAttribute(requestFlowFile, "MEMBER_ID", memberId);
        String uri2 = StringUtils.trimToEmpty(context.getProperty("HTTP URL").evaluateAttributeExpressions().getValue());
        getLogger().info("ID: " + memberId + "  URI Found after: " + uri2);
        super.onTrigger(context, session);
    }
}

 

Hi all, I am new to nifi and trying to create a new custom processor extending the invokeHttp processor. I am trying to write a processor will some default values pre configured in to invokeHttp and ask my team to use this processor to add properties that will be used to update properties in the invokeHttp properties.

I am getting the following error while trying to run the custom processor

Processing halted: yielding [1 sec]
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=5ff2cede-6663-401e-ba5c-d72e62a00913,claim=,offset=0,name=5ff2cede-6663-401e-ba5c-d72e62a00913,size=0] transfer relationship not specified. This FlowFile was created in this session and was not transferred to any Relationship via ProcessSession.transfer()
at org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:261)
at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:276)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:559)
at org.apache.nifi.controller.repository.StandardProcessSession.commitAsync(StandardProcessSession.java:514)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

 

1 ACCEPTED SOLUTION

avatar

What happen when you tried to write the flowfile back? What error message did you get? Not sure what else you can do besides re writing the whole thing.

View solution in original post

8 REPLIES 8

avatar

Hi @vijay_loyalty ,

I have never tried what you are doing so I might be wrong here but my understanding is when you override the OnTrigger method in your custom invokehttp processor and use the following statement:

FlowFile requestFlowFile = session.get();

Its like you are emptying the queue from the flowfile and the  flowfile no longer exists in the session, therefore when you pass the call to the parent class and the parent class try to do the same thing using the same statement there wont be a flowfile hence the error you are getting. I actually tried the following script in an ExecuteGroovyScrip where I called the session.get() twice and it failed:

flowFile = session.get()
if(!flowFile) return
flowFile = session.get()
// Processing occurs here
session.transfer(flowFile, REL_SUCCESS)

 The error message was different but I think both error are related to the same issue.

Not sure what you can do about though, I was thinking you have two options:

1- Dont Inherit , instead copy the whole code from the original and create your custom invokehttp processor using the original code plus whatever you need to add.

2- I did not test this so Im not sure if its going to work, but before calling the super.OnTrigger(...) method  try to write the flowfile back to the session after adding the attributes using session.write(...)

 

If you find this is helpful please accept solution.

 

avatar
Thank you very much for the detailed information.

The reason I am creating the new flow is because the session.get returns a null and I need the flow file to have the attributes populated for the expression language in the url. If there is a way to create a dynamic string from the properties in the onTrigger that would be great. Also, this processor would be the first in the flow so I assume there will be no flow file to begin with hence the session.create(). The whole purpose of this processor is to give my non developer team a processor that only needs a couple of properties filled and the rest api will be invoked based on the pre-defined configuration in the custom processor.

avatar

I understand, The InvokeHttp processor has a lot of properties and there is a lot of dependencies  on what value triggers what property. In this case I would recommend writing your own custom processor to simplify and you can use some of the code from the original if you want to use same relatioships\same write & read attributes. Another option is to use Parameter Context. I know this is still a lot for them to populate but this way you can instruct them which parameter they need to use on which properties and which properties they need to set themselves. You can even populate those pre configured properties as well in an invokehttp processor and create a template from it and ask them to use the template one instead of creating new one each time. This way you dont have to write any custom code and save your self the effort of coding, troubleshooting and deployment.

avatar
Super Collaborator

"Also, this processor would be the first in the flow so I assume there will be no flow file" when I have a need similar to this, I'll still use a GenerateFlowFile processor to help schedule frequency and then a custom scripted processor that will add/replace data and/or add attributes to the FlowFile for everything down stream.

avatar

Thank you very much for your inputs. I will try solutions suggested and see which one suits best for my use case.

avatar

Hi @SAMSAL , 

You are correct, session.get() in the parent onTrigger gets the null value. I have tried writing the flowfile but that did not work. I have tried to call the transfer method but got an error saying `java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are created in this Session back to self`

Here is the onTrigger code 

 

@Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        String memberId = StringUtils.trimToEmpty(context.getProperty("ID").evaluateAttributeExpressions().getValue());
        String uri = StringUtils.trimToEmpty(context.getProperty("HTTP URL").evaluateAttributeExpressions().getValue());
        getLogger().info("ID: " + memberId + "  URI Found: " + uri);
        FlowFile requestFlowFile = session.get();
        if (requestFlowFile == null) {
            requestFlowFile = session.create();
        }
        requestFlowFile = session.putAttribute(requestFlowFile, "MEMBER_ID", memberId);
//        requestFlowFile = session.write(requestFlowFile, (inputStream, outputStream) -> {
//            outputStream.write(inputStream.readAllBytes());
//        });
//        session.commit();
        String uri2 = StringUtils.trimToEmpty(context.getProperty("HTTP URL").evaluateAttributeExpressions(requestFlowFile).getValue());
        getLogger().info("ID: " + memberId + "  URI Found after: " + uri2);
        FlowFile flowFile = session.clone(requestFlowFile);
        session.transfer(flowFile);
        super.onTrigger(context, session);
    }

 
If you think there is anything else, I can try by extending the invokeHttp class that would be helpful, else I will just rewrite the invokeHttp code in my custom processor.

avatar

What happen when you tried to write the flowfile back? What error message did you get? Not sure what else you can do besides re writing the whole thing.

avatar

No error when I write the flow file but in the super onTrigger session.get() returns a null. So I have tried to transfer to the same queue using session.transfer(ff) but that gives an error saying Cannot transfer FlowFiles that are created in this Session back to self`.

Finally I have decided to write a custom processor from scratch using the code from invokeHttp for my usecase. Thank you all for the inputs.