Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Execute Script ProcessSession.read has not been closed

avatar
Explorer

Hi, I am trying ro read from a json file and create flowfiles with the kews as attributes and one flowfile per value in the list.

 

The json file looks something like this:

{

"key1": null,
"key2": null,
"list": ["foo", "bar"],
"key3": 1

}

 

This is my execute script:

 

 

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback

flowFile = session.get() 
if (flowFile != None):
    
    stream_content = session.read(flowFile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_data = json.loads(text_content)
	
    
    for i in json_data["list"]:
        
        flowFile2 = session.create()
        flowFile2 = session.putAttribute(flowFile2, "key3", str(json_data["key3"]))
        flowFile2 = session.putAttribute(flowFile2, "key1", json_data["key1"])
        flowFile2 = session.putAttribute(flowFile2, "list_value", i)
    
        session.transfer(flowFile2, REL_SUCCESS)

    session.remove(flowFile)

 

 

Now I keep getting an error that I need to close the process session first because my original flowfile is still in use. How can I iterate over the list to create multiple flowfiles properly? I couldn't find any similar case or example of this sort. Thanks for tips and help.

 

Edit: So I changed my code to create a dictionnary in order to use a new variable name for each flowfile created, but now I keep getting a key error when I want to transfer them to the success relationship.

 

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback

flowFile = session.get() 
if (flowFile != None):

	stream_content = session.read(flowFile)
	text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
	json_data = json.loads(text_content)
	
	d = {}
	for x in range(len(json_data["list"])):
		d["flowfile{0}".format(x)] = json_data['list'][x]
	
	for file in d:
		file = session.create()
		file = session.putAttribute(file, "key3", str(json_data["key3"]))
		file = session.putAttribute(file, "key2", json_data["key2"])
		file = session.putAttribute(file, "list_value", d[file])
		session.transfer(file, REL_SUCCESS)
	
session.remove(flowFile)

 

1 ACCEPTED SOLUTION

avatar
Super Collaborator

@Fredi The solution you are looking for, is to use this line of code in your script during a loop whenever you want to send the current flowfile content. 

 session.commit()

 This will send a flowfile out.   This is little known, as this command is inferred at the end of the script, assuming 1 execution to one flowfile.   

 

Here is an example from my fraud detection demo, a while statement that does some counts, sends multiple flowfiles of good transactions, then in random iterations during 20 loops, sends some fraud transaction flowfiles.

 

# All processing code starts at this indent
while ticks < 20:
 ticks += 1
 fintran = create_fintran()   
 fintransaction =  json.dumps(fintran)
 #send_fintran(out_socket, json.dumps(fintran))
 #print(fintransaction)
 flowFile = session.create()
 flowFile = session.write(flowFile, WriteContentCallback(fintransaction))
 session.transfer(flowFile, REL_SUCCESS)
 session.commit()

 sleep(DELAY)
 if ticks > fraud_tick:
    fraudtran = create_fraudtran(fintran)
    fraudfintransaction=json.dumps(fraudtran)
    #send_fintran(out_socket, json.dumps(fraudtran))
    #print(fraudfintransaction)
    flowFile2 = session.create()
    flowFile2 = session.write(flowFile2, WriteContentCallback(fraudfintransaction))
    session.transfer(flowFile2, REL_SUCCESS)
    session.commit()
    fraud_tick = random.randint(FRAUD_TICK_MIN, FRAUD_TICK_MAX)

 

View solution in original post

4 REPLIES 4

avatar
Super Collaborator

@Fredi,

I am not an expert when it comes to using ExecuteScript in NiFi, as I mostly go for ExecuteStreamCommand, but I really recommend your to have a look on the following two links (until somebody with far more experience provides you with an answer), as they explain everything you need to know when it comes to executing a Jython script in NiFi:

See Part 2 for I/O on FlowFiles: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018

See Part 1 for FlowFile Creation: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

If I were to look at your code and the examples provided by @mburgess, you are missing some important steps and that might be the cause of your error.

avatar
Explorer

Hi @cotopaul, thanks for your remark. I have seen the cookbooks, but there don't seem to be examples for creating multiple flowfiles from one incoming flowfile. As you can see from my edit I have advanced a bit, Also, I don't even want to generate content (I do that later with other processors). I just want to create flowfiles with some custom attributes with different values.

 

 

avatar
Super Collaborator

@Fredi The solution you are looking for, is to use this line of code in your script during a loop whenever you want to send the current flowfile content. 

 session.commit()

 This will send a flowfile out.   This is little known, as this command is inferred at the end of the script, assuming 1 execution to one flowfile.   

 

Here is an example from my fraud detection demo, a while statement that does some counts, sends multiple flowfiles of good transactions, then in random iterations during 20 loops, sends some fraud transaction flowfiles.

 

# All processing code starts at this indent
while ticks < 20:
 ticks += 1
 fintran = create_fintran()   
 fintransaction =  json.dumps(fintran)
 #send_fintran(out_socket, json.dumps(fintran))
 #print(fintransaction)
 flowFile = session.create()
 flowFile = session.write(flowFile, WriteContentCallback(fintransaction))
 session.transfer(flowFile, REL_SUCCESS)
 session.commit()

 sleep(DELAY)
 if ticks > fraud_tick:
    fraudtran = create_fraudtran(fintran)
    fraudfintransaction=json.dumps(fraudtran)
    #send_fintran(out_socket, json.dumps(fraudtran))
    #print(fraudfintransaction)
    flowFile2 = session.create()
    flowFile2 = session.write(flowFile2, WriteContentCallback(fraudfintransaction))
    session.transfer(flowFile2, REL_SUCCESS)
    session.commit()
    fraud_tick = random.randint(FRAUD_TICK_MIN, FRAUD_TICK_MAX)

 

avatar
Explorer

@steven-matison

Thanks man. It is true, the session.commit() method can be found in the abstract processor class, which is why I did not think of adding it. This helped me a lot! Also I needed to close the Inputstream with

 

IOUtils.closeQuietly(stream_content)

 

Thirdly I had to use the enumerate function for the dictionnary, because it couldn't read the line

 

 

 

file = session.putAttribute(file, "list_value", d[file])

 

 

 

So I just filled the dict with empty values and used session.putAttribute(file, "list_value", json_data['list'][i])

It is ugly, but works at least.

Labels