Created on 05-31-2023 12:18 AM - edited 05-31-2023 03:16 AM
Hi, I am trying ro read from a json file and create flowfiles with the kews as attributes and one flowfile per value in the list.
The json file looks something like this:
{
"key1": null,
"key2": null,
"list": ["foo", "bar"],
"key3": 1
}
This is my execute script:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback
flowFile = session.get()
if (flowFile != None):
stream_content = session.read(flowFile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_data = json.loads(text_content)
for i in json_data["list"]:
flowFile2 = session.create()
flowFile2 = session.putAttribute(flowFile2, "key3", str(json_data["key3"]))
flowFile2 = session.putAttribute(flowFile2, "key1", json_data["key1"])
flowFile2 = session.putAttribute(flowFile2, "list_value", i)
session.transfer(flowFile2, REL_SUCCESS)
session.remove(flowFile)
Now I keep getting an error that I need to close the process session first because my original flowfile is still in use. How can I iterate over the list to create multiple flowfiles properly? I couldn't find any similar case or example of this sort. Thanks for tips and help.
Edit: So I changed my code to create a dictionnary in order to use a new variable name for each flowfile created, but now I keep getting a key error when I want to transfer them to the success relationship.
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback
flowFile = session.get()
if (flowFile != None):
stream_content = session.read(flowFile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_data = json.loads(text_content)
d = {}
for x in range(len(json_data["list"])):
d["flowfile{0}".format(x)] = json_data['list'][x]
for file in d:
file = session.create()
file = session.putAttribute(file, "key3", str(json_data["key3"]))
file = session.putAttribute(file, "key2", json_data["key2"])
file = session.putAttribute(file, "list_value", d[file])
session.transfer(file, REL_SUCCESS)
session.remove(flowFile)
Created 05-31-2023 05:35 AM
@Fredi The solution you are looking for, is to use this line of code in your script during a loop whenever you want to send the current flowfile content.
session.commit()
This will send a flowfile out. This is little known, as this command is inferred at the end of the script, assuming 1 execution to one flowfile.
Here is an example from my fraud detection demo, a while statement that does some counts, sends multiple flowfiles of good transactions, then in random iterations during 20 loops, sends some fraud transaction flowfiles.
# All processing code starts at this indent while ticks < 20: ticks += 1 fintran = create_fintran() fintransaction = json.dumps(fintran) #send_fintran(out_socket, json.dumps(fintran)) #print(fintransaction) flowFile = session.create() flowFile = session.write(flowFile, WriteContentCallback(fintransaction)) session.transfer(flowFile, REL_SUCCESS) session.commit() sleep(DELAY) if ticks > fraud_tick: fraudtran = create_fraudtran(fintran) fraudfintransaction=json.dumps(fraudtran) #send_fintran(out_socket, json.dumps(fraudtran)) #print(fraudfintransaction) flowFile2 = session.create() flowFile2 = session.write(flowFile2, WriteContentCallback(fraudfintransaction)) session.transfer(flowFile2, REL_SUCCESS) session.commit() fraud_tick = random.randint(FRAUD_TICK_MIN, FRAUD_TICK_MAX)
Created 05-31-2023 03:12 AM
@Fredi,
I am not an expert when it comes to using ExecuteScript in NiFi, as I mostly go for ExecuteStreamCommand, but I really recommend your to have a look on the following two links (until somebody with far more experience provides you with an answer), as they explain everything you need to know when it comes to executing a Jython script in NiFi:
See Part 2 for I/O on FlowFiles: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018
See Part 1 for FlowFile Creation: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922
If I were to look at your code and the examples provided by @mburgess, you are missing some important steps and that might be the cause of your error.
Created 05-31-2023 03:26 AM
Hi @cotopaul, thanks for your remark. I have seen the cookbooks, but there don't seem to be examples for creating multiple flowfiles from one incoming flowfile. As you can see from my edit I have advanced a bit, Also, I don't even want to generate content (I do that later with other processors). I just want to create flowfiles with some custom attributes with different values.
Created 05-31-2023 05:35 AM
@Fredi The solution you are looking for, is to use this line of code in your script during a loop whenever you want to send the current flowfile content.
session.commit()
This will send a flowfile out. This is little known, as this command is inferred at the end of the script, assuming 1 execution to one flowfile.
Here is an example from my fraud detection demo, a while statement that does some counts, sends multiple flowfiles of good transactions, then in random iterations during 20 loops, sends some fraud transaction flowfiles.
# All processing code starts at this indent while ticks < 20: ticks += 1 fintran = create_fintran() fintransaction = json.dumps(fintran) #send_fintran(out_socket, json.dumps(fintran)) #print(fintransaction) flowFile = session.create() flowFile = session.write(flowFile, WriteContentCallback(fintransaction)) session.transfer(flowFile, REL_SUCCESS) session.commit() sleep(DELAY) if ticks > fraud_tick: fraudtran = create_fraudtran(fintran) fraudfintransaction=json.dumps(fraudtran) #send_fintran(out_socket, json.dumps(fraudtran)) #print(fraudfintransaction) flowFile2 = session.create() flowFile2 = session.write(flowFile2, WriteContentCallback(fraudfintransaction)) session.transfer(flowFile2, REL_SUCCESS) session.commit() fraud_tick = random.randint(FRAUD_TICK_MIN, FRAUD_TICK_MAX)
Created on 05-31-2023 06:20 AM - edited 05-31-2023 06:20 AM
Thanks man. It is true, the session.commit() method can be found in the abstract processor class, which is why I did not think of adding it. This helped me a lot! Also I needed to close the Inputstream with
IOUtils.closeQuietly(stream_content)
Thirdly I had to use the enumerate function for the dictionnary, because it couldn't read the line
file = session.putAttribute(file, "list_value", d[file])
So I just filled the dict with empty values and used session.putAttribute(file, "list_value", json_data['list'][i])
It is ugly, but works at least.