Created 08-01-2024 07:31 AM
Hello, I have a problem with execute script processor. I have an incoming flowfile with json content like this:
{
"setName": "name",
"optionalDict": {},
"directories": [
{
"path":"path/to/file",
"fileName":"filename",
"id":1
}
],
"setId":12
}
My script is the following:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor.io import InputStreamCallback
flowFile = session.get()
if (flowFile != None):
stream_content = session.read(flowFile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
IOUtils.closeQuietly(stream_content)
json_data = json.loads(text_content)
session.remove(flowFile)
d = {}
for x in range(len(json_data['directories'])):
d["flowfile{0}".format(x)] = None
for i, file in enumerate(d):
file = session.create()
file = session.putAttribute(file, "setId", str(json_data['setId']))
file = session.putAttribute(file, "setName", json_data["setName"])
file = session.putAttribute(file, "absolute.path", json_data['directories'][i]["path")
if json_data['optionalDict'] is not None:
if bool(json_data['optionalDict']):
file = session.putAttribute(file, "value1", str(json_data['set_entity_relation']['intValue']))
file = session.putAttribute(file, "value2", json_data['set_entity_relation']['stringValue'])
session.transfer(file, REL_SUCCESS)
session.commit()
So what I aim to do is to commit one flowfile per entry in "directories" and write some values to the flowfiles attributes. This works fine if the "optionalDict" is not null and has the necessary entries. When it is an empty dict though, the script still tries to write the optional values and fails, since there are none. When I execute the same python code on my local scripts the if clauses prevent this.
Why does the executeScript processor ignore the two if clauses? I have tried different ways to stop it from executing the optional steps, but nothing works. Also it keeps telling me the error is on the line of the if clauses, when in reality it is two lines further down. This is the error message:
TypeError: putAttribute(): 3rd arg can't be coerced to String
Well, it shouldn't try to. What could I try to do?
Created on 08-07-2024 04:08 AM - edited 08-07-2024 04:12 AM
Hi @Fredi ,
Its hard to say what is happening without looking at the data where optionalDict is not empty. You only provided data when its empty. Keep in mind that this is not true Python its actually flavor of it called Jython so its not apple to apple when comparing to python.
If I can suggest two alternatives:
1 - Since Jython script is going to be deprecated starting from version 2.0 , then I would recommend using groovy instead . Actually parsing json in groovy is much simpler than Jython. Im not sure what version you are using but there is a dedicated processor for executing groovy script called ExecuteGroovyScript that is probably faster than running traditional ExecuteScritp which you can still use it. The script looks like this based on your input :
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
flowFile = session.get()
if(!flowFile) return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def jsonSlurper = new JsonSlurper()
def jsonData = jsonSlurper.parseText(text)
if(jsonData.directories[0])
{
session.remove(flowFile)
jsonData.directories.each { d ->
newflowfile = session.create()
newflowfile = session.write(newflowfile, {inputStream, outputStream ->
outputStream.write(JsonOutput.toJson(d).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
newflowfile = session.putAttribute(newflowfile, "setId", jsonData.setId.toString())
newflowfile = session.putAttribute(newflowfile, "setName", jsonData.setName)
newflowfile = session.putAttribute(newflowfile, "absolute.path", d.path)
if(jsonData.optionalDict)
{
newflowfile = session.putAttribute(newflowfile, "value1", jsonData.optionalDict.set_entity_relation.intValue.toString())
newflowfile = session.putAttribute(newflowfile, "value2", jsonData.optionalDict.set_entity_relation.stringValue)
}
session.transfer(newflowfile, REL_SUCCESS)
}
}
else session.transfer(flowfile, REL_FAILURE)
I have tried the script for both scenarios and it worked as expected.
2- The other alternative is to use other processors (nifi way) to achieve what you want without executing script (not the nifi way) . The execute processor should be left as last option incase the out of the box processors dont suffice or you looking to improve performance in case the flow gets very complicated and inefficient. For this I would use the following processors:
1- JsonEvaluatePath to extract common attribues: setId, SetName, optionalDict.value 1 & 2..etc.
2-Do JsonSplit or QueryRecords on the directories object: this will produce different flowfiles and each flowfile will have the common attribute.
3- JsonEvaluatePath to extract each directory attributes even though its already part of the flowfile content.
Hopefully that helps. If it does please accept the solution.
Thanks
Created on 08-07-2024 04:08 AM - edited 08-07-2024 04:12 AM
Hi @Fredi ,
Its hard to say what is happening without looking at the data where optionalDict is not empty. You only provided data when its empty. Keep in mind that this is not true Python its actually flavor of it called Jython so its not apple to apple when comparing to python.
If I can suggest two alternatives:
1 - Since Jython script is going to be deprecated starting from version 2.0 , then I would recommend using groovy instead . Actually parsing json in groovy is much simpler than Jython. Im not sure what version you are using but there is a dedicated processor for executing groovy script called ExecuteGroovyScript that is probably faster than running traditional ExecuteScritp which you can still use it. The script looks like this based on your input :
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
flowFile = session.get()
if(!flowFile) return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def jsonSlurper = new JsonSlurper()
def jsonData = jsonSlurper.parseText(text)
if(jsonData.directories[0])
{
session.remove(flowFile)
jsonData.directories.each { d ->
newflowfile = session.create()
newflowfile = session.write(newflowfile, {inputStream, outputStream ->
outputStream.write(JsonOutput.toJson(d).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
newflowfile = session.putAttribute(newflowfile, "setId", jsonData.setId.toString())
newflowfile = session.putAttribute(newflowfile, "setName", jsonData.setName)
newflowfile = session.putAttribute(newflowfile, "absolute.path", d.path)
if(jsonData.optionalDict)
{
newflowfile = session.putAttribute(newflowfile, "value1", jsonData.optionalDict.set_entity_relation.intValue.toString())
newflowfile = session.putAttribute(newflowfile, "value2", jsonData.optionalDict.set_entity_relation.stringValue)
}
session.transfer(newflowfile, REL_SUCCESS)
}
}
else session.transfer(flowfile, REL_FAILURE)
I have tried the script for both scenarios and it worked as expected.
2- The other alternative is to use other processors (nifi way) to achieve what you want without executing script (not the nifi way) . The execute processor should be left as last option incase the out of the box processors dont suffice or you looking to improve performance in case the flow gets very complicated and inefficient. For this I would use the following processors:
1- JsonEvaluatePath to extract common attribues: setId, SetName, optionalDict.value 1 & 2..etc.
2-Do JsonSplit or QueryRecords on the directories object: this will produce different flowfiles and each flowfile will have the common attribute.
3- JsonEvaluatePath to extract each directory attributes even though its already part of the flowfile content.
Hopefully that helps. If it does please accept the solution.
Thanks