Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Object in the flow stuck between PutFile and ExecuteScript for longer time than expected.

avatar
Explorer

when I am using PutFile Processor and ExecuteScript Processor,the object in the flow between them stuck for longer time than expected and processed many times.
this case happend only between these 2 processors only.
even if I set the expiration time for the flow and set it to 1 second, and scheduled the run of ExecuteScript to 4 sec, still processing the flow at least 2 times.
which cause sending the file to the API many times.
the version of the NIFI I am usnig is: NIFI v1.18, also I tried to upgrade and use version 1.22, but this issue still as is.

 

The script I am using in ExecuteScript is:

import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.OutputStreamCallback

def flowFile = session.create()

def filePath = 'E:/tmp/tmp_file.txt'
def file = new File(filePath)

if (file.exists()) {
def fileContent = IOUtils.toByteArray(new FileInputStream(file))

flowFile = session.write(flowFile, { outputStream ->
outputStream.write(fileContent)
} as OutputStreamCallback)

session.transfer(flowFile, REL_SUCCESS)
} else {
session.remove(flowFile)
// Handle file not found or other error
}

 

2023-08-28 11_07_11-NotificationsWindow.png

1 ACCEPTED SOLUTION

avatar
Master Mentor

@MukaAddA 

Your issue is 100% in your script.  
The upstream queued FlowFile is being used as the trigger for execution of the ExecuteScript processor.  Instead of reading the upstream FlowFile in your script, you are creating a new FlowFile.  So I think what is happening here you have both the new FlowFile generated by your script along with the original FlowFile being passed to the downstream Success relationship.

You may find the following article helpful:
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

 

View solution in original post

6 REPLIES 6

avatar
Explorer

Up

avatar

@MukaAddA, I see that you are linking two SUCCESS queues to your PutFile. Your PutFile will take all the items from both queues in a random order I assume. Try removing one of the success queues and see what happens. Besides that, see how you configured PutFile to handle the files with the same name, especially if you are using the name further in your processing.

In addition, set you processor on Debug and see what it displays and maybe you get a hint from there regarding you problem.

avatar
Explorer

there are no 2 success queues to PutFile, one is incoming to PutFile and the second outgoing to ExecuteScript and the third outgoing from ExecuteScript.

avatar
Master Mentor

@MukaAddA 

Your issue is 100% in your script.  
The upstream queued FlowFile is being used as the trigger for execution of the ExecuteScript processor.  Instead of reading the upstream FlowFile in your script, you are creating a new FlowFile.  So I think what is happening here you have both the new FlowFile generated by your script along with the original FlowFile being passed to the downstream Success relationship.

You may find the following article helpful:
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

 

avatar
Explorer

Many thanks @MattWho 

Yes this is my issue, but the problem I tried to change my above code based on below:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile)return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
// Do something with text here
} as InputStreamCallback)

 

 

to make it reading the flow instead of creating new flow, but a lots of errors

any suggestions will be helpful for me

 

thanks in advance

avatar
Master Mentor

@MukaAddA 

Sorry, writing such script is not a strong area for me.  I just happened to notice you were doing a session.create instead of a session.get.  You may get better help by raising a new question on how to create a script to be executed by the ExecuteScript processor to accomplish your use case and provide details on that use case.  I am sure there are others in the community that are good at writing such scripts.

Matt