Member since
08-08-2018
1
Post
0
Kudos Received
0
Solutions
08-09-2018
01:14 AM
I have an issue with Nifi v1.6 i was trying to write small python script inside ExecuteScript processor :Although i was just calling session.read with Callback function: the error is org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.lang.IllegalStateException: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=1343dc70-6750-4269-af88-d02784c07a44,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1533643252945-1, container=default, section=1], offset=675, length=61],offset=0,name=21873824285366087,size=61] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed in <script> at line number Code as follows from org.apache.nifi.processors.script import ExecuteScript
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback
import sys, re
import traceback
from org.python.core.util import StringUtil
class SplitCallback(InputStreamCallback):
def __init__(self):
self.parentFlowFile = None
def process(self, inputStream):
try:
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input_list = input_text.split('\n')
# Create FlowFiles for array items
splits = []
splitFlowFile = session.create(self.parentFlowFile)
for record in input_list:
words = record.split(" ")
if words[1].lower() == "running" :
continue
else:
splitFlowFile = session.putAllAttributes(splitFlowFile, {'Application': words[0],'Status': words[1]})
splits.append(splitFlowFile)
if len(splits) > 0 :
for splitFlowFile in splits:
session.transfer(splitFlowFile, ExecuteScript.REL_FAILURE)
else:
session.transfer(splitFlowFile, ExecuteScript.REL_SUCCESS)
except:
traceback.print_exc(file=sys.stdout)
raise
finally:
if inputStream is not None :
inputStream.close()
session.commit()
parentFlowFile = session.get()
if parentFlowFile != None:
splitCallback = SplitCallback()
splitCallback.parentFlowFile = parentFlowFile
session.read(parentFlowFile,splitCallback)
... View more