Created on 12-12-2018 07:20 PM - edited 09-16-2022 06:59 AM
Hi All,
I'm trying to create simple custom processor, I used to work with executeScript processor to for loop using the below
flowFile = session.write(flowFile, {inputStream, outputStream -> inputStream.eachLine { line -> def a = line.split("\\|", -1) outputStream.write("$Test ABCABC\n".toString().getBytes(StandardCharsets.UTF_8)) } } as StreamCallback)
How can I rewrite this in Java, to inject in the below custom processor code
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package hwx.processors.hwx; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @Tags({"example"}) @CapabilityDescription("Provide a description") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute="", description="")}) @WritesAttributes({@WritesAttribute(attribute="", description="")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor .Builder().name("MY_PROPERTY") .displayName("My property") .description("Example Property") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final Relationship MY_RELATIONSHIP = new Relationship.Builder() .name("MY_RELATIONSHIP") .description("Example relationship") .build(); private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(MY_PROPERTY); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(MY_RELATIONSHIP); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set<Relationship> getRelationships() { return this.relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptors; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if ( flowFile == null ) { return; } ////// Code here ///////////////// session.transfer(flowFile, MY_RELATIONSHIP); } }
Thanks a lot
Created 12-13-2018 06:35 PM
Any help ? How to rewrite the below in Java for custom processor
flowFile = session.write(flowFile,{inputStream, outputStream ->
inputStream.eachLine { line ->
outputStream.write("Test ABCABC\n".toString().getBytes(StandardCharsets.UTF_8))
}
}asStreamCallback)
Created 12-13-2018 07:30 PM
In the Groovy script, the "as StreamCallback" clause is called "type coercion", it lets you specify a Closure and tell it what interface/class to treat it as. For a one-method interface like StreamCallback (it has a single process() method), you can coerce the closure into an instance of a StreamCallback.
In Java 8 (using lambdas and the fact that Java is compiled), you don't want the "as StreamCallback" at the end of the lambda. It knows from the method signature that the lambda should be a StreamCallback, and will treat it as such.
For the inputStream.eachLine{}, that is a method added to Groovy's InputStream that allows you to read one line at a time; it injects the line into the closure. In Java 8+, you can do this by creating a BufferedReader from an InputStreamReader using the incoming InputStream:
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
Then you can use the lines() method to get a Stream where each element is a line in the stream.
I believe the line.split() method you are using is from Java's String class, so you should be able to keep that, although it will generate a String[] so your "a" variable will have to be of String[] type (no "defs" or "vars" in Java 8). The same goes for the outputStream.write() call, I believe that will remain the same. Looks like you might end up with something like this (but I didn't try it):
flowFile = session.write(flowFile,{inputStream, outputStream -> BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); br.lines().forEach(line -> { String[] a = line.split("\\|",-1); outputStream.write("SOMETHING WITH a[]".getBytes(StandardCharsets.UTF_8)); }); });
It doesn't look like you're using the "a" array in your output, I imagine you'd changed it to test things out before you pasted it here. If you have trouble using "a" in your Java code while trying to output something, reply to this answer and I will try to help you through it.
Created 12-14-2018 01:05 PM
Hello @Matt. New code as you advised, had to put session.read(flowFile) for the inputStream
BufferedReader br = new BufferedReader(new InputStreamReader(session.read(flowFile), StandardCharsets.UTF_8)); flowFile = session.write(flowFile, outputStream -> { br.lines().forEach(line -> { try { outputStream.write((line + " SOMETHING").getBytes(StandardCharsets.UTF_8)); } catch (IOException ex) { Logger.getLogger(MyProcessor.class.getName() + line).log(Level.SEVERE, null, ex); } }); } ); session.transfer (flowFile, MY_RELATIONSHIP);
Error: Im still getting the below error
ProcessSession.read(FlowFile) has not been closed
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000} span.s1 {font-variant-ligatures: no-common-ligatures} span.Apple-tab-span {white-space:pre}
2018-12-14 13:35:55,707 WARN [Timer-Driven Process Thread-10] o.a.n.controller.tasks.ConnectableTask Administratively Yielding MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] due to uncaught Exception: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3147)
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3142)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2632)
at nifi.processors.nifi.MyProcessor.onTrigger(MyProcessor.java:111)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Created 12-14-2018 01:05 PM
Thanks @Matt for your help
I followed the steps as you highlighted, code below
try { FlowFile flowFile = session.get(); if ( flowFile == null ) { return; } InputStream inputStream = session.read(flowFile) ; BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); flowFile = session.write(flowFile, outputStream -> { br.lines().forEach(line -> { try { outputStream.write((line + " SOMETHING WITH a[]").getBytes(StandardCharsets.UTF_8)); } catch (IOException ex) { Logger.getLogger(MyProcessor.class.getName() + line).log(Level.SEVERE, null, ex); } }); } ); inputStream.close(); session.transfer (flowFile, MY_RELATIONSHIP); } catch (IOException ex) { }
Not sure how should i close the stream, error below
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000} span.s1 {font-variant-ligatures: no-common-ligatures} span.Apple-tab-span {white-space:pre}
2018-12-14 13:35:55,706 ERROR [Timer-Driven Process Thread-10] nifi.processors.nifi.MyProcessor MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] failed to process session due to java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed; Processor Administratively Yielded for 1 sec: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3147)
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3142)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2632)
at nifi.processors.nifi.MyProcessor.onTrigger(MyProcessor.java:111)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Created 12-14-2018 06:39 PM
You shouldn't need to move out the inputStream stuff into a session.read(). There are two session.write() methods, one uses an OutputStreamCallback, and that's what you'd more likely use if you needed the input and output processing to be different. In your case if you are overwriting the incoming flow file, you should use the StreamCallback version, which passes in both the inputStream and outputStream to the lambda. Then you can create the BufferedReader and also write the output in the same method.
Is there a reason you needed to move the inputStream part into a separate session.read()?
Created 12-14-2018 08:25 PM
No reason actually, I tried to use the code provided above, but it didn't recognise the inputStream object
flowFile = session.write(flowFile,{inputStream, outputStream ->BufferedReader br =newBufferedReader(newInputStreamReader(inputStream)); br.lines().forEach(line ->{String[] a = line.split("\\|",-1); outputStream.write("SOMETHING WITH a[]".getBytes(StandardCharsets.UTF_8)); }); });