Created 08-28-2023 01:20 AM
when I am using PutFile Processor and ExecuteScript Processor,the object in the flow between them stuck for longer time than expected and processed many times.
this case happend only between these 2 processors only.
even if I set the expiration time for the flow and set it to 1 second, and scheduled the run of ExecuteScript to 4 sec, still processing the flow at least 2 times.
which cause sending the file to the API many times.
the version of the NIFI I am usnig is: NIFI v1.18, also I tried to upgrade and use version 1.22, but this issue still as is.
The script I am using in ExecuteScript is:
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.OutputStreamCallback
def flowFile = session.create()
def filePath = 'E:/tmp/tmp_file.txt'
def file = new File(filePath)
if (file.exists()) {
def fileContent = IOUtils.toByteArray(new FileInputStream(file))
flowFile = session.write(flowFile, { outputStream ->
outputStream.write(fileContent)
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} else {
session.remove(flowFile)
// Handle file not found or other error
}
Created 08-30-2023 09:03 AM
@MukaAddA
Your issue is 100% in your script.
The upstream queued FlowFile is being used as the trigger for execution of the ExecuteScript processor. Instead of reading the upstream FlowFile in your script, you are creating a new FlowFile. So I think what is happening here you have both the new FlowFile generated by your script along with the original FlowFile being passed to the downstream Success relationship.
You may find the following article helpful:
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt
Created 08-29-2023 01:02 AM
Up
Created 08-29-2023 02:01 AM
@MukaAddA, I see that you are linking two SUCCESS queues to your PutFile. Your PutFile will take all the items from both queues in a random order I assume. Try removing one of the success queues and see what happens. Besides that, see how you configured PutFile to handle the files with the same name, especially if you are using the name further in your processing.
In addition, set you processor on Debug and see what it displays and maybe you get a hint from there regarding you problem.
Created 08-30-2023 01:39 AM
there are no 2 success queues to PutFile, one is incoming to PutFile and the second outgoing to ExecuteScript and the third outgoing from ExecuteScript.
Created 08-30-2023 09:03 AM
@MukaAddA
Your issue is 100% in your script.
The upstream queued FlowFile is being used as the trigger for execution of the ExecuteScript processor. Instead of reading the upstream FlowFile in your script, you are creating a new FlowFile. So I think what is happening here you have both the new FlowFile generated by your script along with the original FlowFile being passed to the downstream Success relationship.
You may find the following article helpful:
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt
Created 08-31-2023 02:56 AM
Many thanks @MattWho
Yes this is my issue, but the problem I tried to change my above code based on below:
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile)return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
// Do something with text here
} as InputStreamCallback)
to make it reading the flow instead of creating new flow, but a lots of errors
any suggestions will be helpful for me
thanks in advance
Created 09-06-2023 12:05 PM
@MukaAddA
Sorry, writing such script is not a strong area for me. I just happened to notice you were doing a session.create instead of a session.get. You may get better help by raising a new question on how to create a script to be executed by the ExecuteScript processor to accomplish your use case and provide details on that use case. I am sure there are others in the community that are good at writing such scripts.
Matt