Support Questions

Find answers, ask questions, and share your expertise

Execute Script ProcessSession.read has not been closed

avatar
Contributor

Hi, I am trying ro read from a json file and create flowfiles with the kews as attributes and one flowfile per value in the list.

 

The json file looks something like this:

{

"key1": null,
"key2": null,
"list": ["foo", "bar"],
"key3": 1

}

 

This is my execute script:

 

 

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback

flowFile = session.get() 
if (flowFile != None):
    
    stream_content = session.read(flowFile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_data = json.loads(text_content)
	
    
    for i in json_data["list"]:
        
        flowFile2 = session.create()
        flowFile2 = session.putAttribute(flowFile2, "key3", str(json_data["key3"]))
        flowFile2 = session.putAttribute(flowFile2, "key1", json_data["key1"])
        flowFile2 = session.putAttribute(flowFile2, "list_value", i)
    
        session.transfer(flowFile2, REL_SUCCESS)

    session.remove(flowFile)

 

 

Now I keep getting an error that I need to close the process session first because my original flowfile is still in use. How can I iterate over the list to create multiple flowfiles properly? I couldn't find any similar case or example of this sort. Thanks for tips and help.

 

Edit: So I changed my code to create a dictionnary in order to use a new variable name for each flowfile created, but now I keep getting a key error when I want to transfer them to the success relationship.

 

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback

flowFile = session.get() 
if (flowFile != None):

	stream_content = session.read(flowFile)
	text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
	json_data = json.loads(text_content)
	
	d = {}
	for x in range(len(json_data["list"])):
		d["flowfile{0}".format(x)] = json_data['list'][x]
	
	for file in d:
		file = session.create()
		file = session.putAttribute(file, "key3", str(json_data["key3"]))
		file = session.putAttribute(file, "key2", json_data["key2"])
		file = session.putAttribute(file, "list_value", d[file])
		session.transfer(file, REL_SUCCESS)
	
session.remove(flowFile)

 

1 ACCEPTED SOLUTION

avatar

@Fredi The solution you are looking for, is to use this line of code in your script during a loop whenever you want to send the current flowfile content. 

 session.commit()

 This will send a flowfile out.   This is little known, as this command is inferred at the end of the script, assuming 1 execution to one flowfile.   

 

Here is an example from my fraud detection demo, a while statement that does some counts, sends multiple flowfiles of good transactions, then in random iterations during 20 loops, sends some fraud transaction flowfiles.

 

# All processing code starts at this indent
while ticks < 20:
 ticks += 1
 fintran = create_fintran()   
 fintransaction =  json.dumps(fintran)
 #send_fintran(out_socket, json.dumps(fintran))
 #print(fintransaction)
 flowFile = session.create()
 flowFile = session.write(flowFile, WriteContentCallback(fintransaction))
 session.transfer(flowFile, REL_SUCCESS)
 session.commit()

 sleep(DELAY)
 if ticks > fraud_tick:
    fraudtran = create_fraudtran(fintran)
    fraudfintransaction=json.dumps(fraudtran)
    #send_fintran(out_socket, json.dumps(fraudtran))
    #print(fraudfintransaction)
    flowFile2 = session.create()
    flowFile2 = session.write(flowFile2, WriteContentCallback(fraudfintransaction))
    session.transfer(flowFile2, REL_SUCCESS)
    session.commit()
    fraud_tick = random.randint(FRAUD_TICK_MIN, FRAUD_TICK_MAX)

 

View solution in original post

4 REPLIES 4

avatar

@Fredi,

I am not an expert when it comes to using ExecuteScript in NiFi, as I mostly go for ExecuteStreamCommand, but I really recommend your to have a look on the following two links (until somebody with far more experience provides you with an answer), as they explain everything you need to know when it comes to executing a Jython script in NiFi:

See Part 2 for I/O on FlowFiles: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018

See Part 1 for FlowFile Creation: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

If I were to look at your code and the examples provided by @mburgess, you are missing some important steps and that might be the cause of your error.

avatar
Contributor

Hi @cotopaul, thanks for your remark. I have seen the cookbooks, but there don't seem to be examples for creating multiple flowfiles from one incoming flowfile. As you can see from my edit I have advanced a bit, Also, I don't even want to generate content (I do that later with other processors). I just want to create flowfiles with some custom attributes with different values.

 

 

avatar

@Fredi The solution you are looking for, is to use this line of code in your script during a loop whenever you want to send the current flowfile content. 

 session.commit()

 This will send a flowfile out.   This is little known, as this command is inferred at the end of the script, assuming 1 execution to one flowfile.   

 

Here is an example from my fraud detection demo, a while statement that does some counts, sends multiple flowfiles of good transactions, then in random iterations during 20 loops, sends some fraud transaction flowfiles.

 

# All processing code starts at this indent
while ticks < 20:
 ticks += 1
 fintran = create_fintran()   
 fintransaction =  json.dumps(fintran)
 #send_fintran(out_socket, json.dumps(fintran))
 #print(fintransaction)
 flowFile = session.create()
 flowFile = session.write(flowFile, WriteContentCallback(fintransaction))
 session.transfer(flowFile, REL_SUCCESS)
 session.commit()

 sleep(DELAY)
 if ticks > fraud_tick:
    fraudtran = create_fraudtran(fintran)
    fraudfintransaction=json.dumps(fraudtran)
    #send_fintran(out_socket, json.dumps(fraudtran))
    #print(fraudfintransaction)
    flowFile2 = session.create()
    flowFile2 = session.write(flowFile2, WriteContentCallback(fraudfintransaction))
    session.transfer(flowFile2, REL_SUCCESS)
    session.commit()
    fraud_tick = random.randint(FRAUD_TICK_MIN, FRAUD_TICK_MAX)

 

avatar
Contributor

@steven-matison

Thanks man. It is true, the session.commit() method can be found in the abstract processor class, which is why I did not think of adding it. This helped me a lot! Also I needed to close the Inputstream with

 

IOUtils.closeQuietly(stream_content)

 

Thirdly I had to use the enumerate function for the dictionnary, because it couldn't read the line

 

 

 

file = session.putAttribute(file, "list_value", d[file])

 

 

 

So I just filled the dict with empty values and used session.putAttribute(file, "list_value", json_data['list'][i])

It is ugly, but works at least.