Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Custom processor Groovy to Java code change

Hi All,

I'm trying to create simple custom processor, I used to work with executeScript processor to for loop using the below

flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
def a = line.split("\\|", -1)
outputStream.write("$Test ABCABC\n".toString().getBytes(StandardCharsets.UTF_8))
}
} as StreamCallback)

How can I rewrite this in Java, to inject in the below custom processor code

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package hwx.processors.hwx;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("MY_PROPERTY")
            .displayName("My property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("MY_RELATIONSHIP")
            .description("Example relationship")
            .build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);
        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }
    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }
    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }
    @OnScheduled
    public void onScheduled(final ProcessContext context) {
    }
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
 ////// Code here
 /////////////////
    session.transfer(flowFile, MY_RELATIONSHIP);
    }
}

Thanks a lot

6 REPLIES 6

Any help ? How to rewrite the below in Java for custom processor

flowFile = session.write(flowFile,{inputStream, outputStream ->

inputStream.eachLine { line ->

outputStream.write("Test ABCABC\n".toString().getBytes(StandardCharsets.UTF_8))

}

}asStreamCallback)

Super Guru

In the Groovy script, the "as StreamCallback" clause is called "type coercion", it lets you specify a Closure and tell it what interface/class to treat it as. For a one-method interface like StreamCallback (it has a single process() method), you can coerce the closure into an instance of a StreamCallback.

In Java 8 (using lambdas and the fact that Java is compiled), you don't want the "as StreamCallback" at the end of the lambda. It knows from the method signature that the lambda should be a StreamCallback, and will treat it as such.

For the inputStream.eachLine{}, that is a method added to Groovy's InputStream that allows you to read one line at a time; it injects the line into the closure. In Java 8+, you can do this by creating a BufferedReader from an InputStreamReader using the incoming InputStream:

BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));

Then you can use the lines() method to get a Stream where each element is a line in the stream.

I believe the line.split() method you are using is from Java's String class, so you should be able to keep that, although it will generate a String[] so your "a" variable will have to be of String[] type (no "defs" or "vars" in Java 8). The same goes for the outputStream.write() call, I believe that will remain the same. Looks like you might end up with something like this (but I didn't try it):

flowFile = session.write(flowFile,{inputStream, outputStream ->
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
br.lines().forEach(line -> {
	String[] a = line.split("\\|",-1);
	outputStream.write("SOMETHING WITH a[]".getBytes(StandardCharsets.UTF_8));
   });
});

It doesn't look like you're using the "a" array in your output, I imagine you'd changed it to test things out before you pasted it here. If you have trouble using "a" in your Java code while trying to output something, reply to this answer and I will try to help you through it.

Hello @Matt. New code as you advised, had to put session.read(flowFile) for the inputStream

 BufferedReader br = new BufferedReader(new InputStreamReader(session.read(flowFile), StandardCharsets.UTF_8));
        flowFile = session.write(flowFile, outputStream -> {
            br.lines().forEach(line -> {
                try {
                    outputStream.write((line + " SOMETHING").getBytes(StandardCharsets.UTF_8));
                } catch (IOException ex) {
                    Logger.getLogger(MyProcessor.class.getName() + line).log(Level.SEVERE, null, ex);
                }
            });
        } );
        session.transfer (flowFile, MY_RELATIONSHIP);

Error: Im still getting the below error

ProcessSession.read(FlowFile) has not been closed

p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000}
span.s1 {font-variant-ligatures: no-common-ligatures}
span.Apple-tab-span {white-space:pre}
2018-12-14 13:35:55,707 WARN [Timer-Driven Process Thread-10] o.a.n.controller.tasks.ConnectableTask Administratively Yielding MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] due to uncaught Exception: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3147)
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3142)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2632)
at nifi.processors.nifi.MyProcessor.onTrigger(MyProcessor.java:111)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Thanks @Matt for your help

I followed the steps as you highlighted, code below

   try {
            FlowFile flowFile = session.get();
            if ( flowFile == null ) {
                return;
            }
            InputStream inputStream = session.read(flowFile) ;
            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
            flowFile = session.write(flowFile, outputStream -> {
                br.lines().forEach(line -> {
                    try {
                        outputStream.write((line + " SOMETHING WITH a[]").getBytes(StandardCharsets.UTF_8));
                    } catch (IOException ex) {
                        Logger.getLogger(MyProcessor.class.getName() + line).log(Level.SEVERE, null, ex);
                    }
                });
            } );
            inputStream.close();
            session.transfer (flowFile, MY_RELATIONSHIP);
        } catch (IOException ex) {
        }

Not sure how should i close the stream, error below

p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000}
span.s1 {font-variant-ligatures: no-common-ligatures}
span.Apple-tab-span {white-space:pre}
2018-12-14 13:35:55,706 ERROR [Timer-Driven Process Thread-10] nifi.processors.nifi.MyProcessor MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] failed to process session due to java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed; Processor Administratively Yielded for 1 sec: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3147)
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3142)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2632)
at nifi.processors.nifi.MyProcessor.onTrigger(MyProcessor.java:111)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)

at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)

at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Super Guru

You shouldn't need to move out the inputStream stuff into a session.read(). There are two session.write() methods, one uses an OutputStreamCallback, and that's what you'd more likely use if you needed the input and output processing to be different. In your case if you are overwriting the incoming flow file, you should use the StreamCallback version, which passes in both the inputStream and outputStream to the lambda. Then you can create the BufferedReader and also write the output in the same method.

Is there a reason you needed to move the inputStream part into a separate session.read()?

No reason actually, I tried to use the code provided above, but it didn't recognise the inputStream object

flowFile = session.write(flowFile,{inputStream, outputStream ->BufferedReader br =newBufferedReader(newInputStreamReader(inputStream));
br.lines().forEach(line ->{String[] a = line.split("\\|",-1);	
outputStream.write("SOMETHING WITH a[]".getBytes(StandardCharsets.UTF_8));
});
});
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.