Created 02-15-2018 12:24 PM
Hey there!
BACKGROUND: I'm looking to setup a NiFi scheduler to run some data transformations. We have clickstream data and need to aggregate user data to find out which different statistics, such as which reports are most popular, and who are the power users.
SETUP: QueryElasticSearchHTTP processor is getting the click stream data, but the flow files are only 1 record at a time. These are being feed into an ExecuteScipt processor to concatenate the flowfile contents into a single flow file content. This file will be sent into another ExecuteScipt processor to run a python script that will produce the final data state.
PROBLEM: I'm getting a Transfer Relationship not specified error my ExecuteScript processor, that is concatenating flow files.
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from java.io import BufferedReader, InputStreamReader from org.apache.nifi.processor.io import OutputStreamCallback from org.apache.nifi.processor.io import InputStreamCallback from org.apache.nifi.processor.io import StreamCallback flowFileList = session.get(100) ##Get next 100 Rows output = '{[' ##create what will be the new content string ff = session.create() ##Create a new flowfile if not flowFileList.isEmpty(): ##Check that my list isn't empty for flowFile in flowFileList: reader = PyInputStreamCallback() ##Create Reader session.read(flowFile, reader) ##Readfile content output = output + reader.getText() ##Append content to output string # Process each FlowFile here writer = PyOutputStreamCallback() ##Create Writer output = output + ']}' ## Append ending to output writer.setText(output) ## Set text that will be written to flowfile ff = session.write(ff, writer) ##Write to flowfile session.transfer(ff, REL_SUCCESS) ##Send flowfile to success pipe # Define a subclass of OutputStreamCallback for use in session.write() class PyOutputStreamCallback(OutputStreamCallback): _text = None ##Just a holder for text def __init__(self): pass def setText(self, text) : self._text = text def process(self, outputStream): outputStream.write(bytearray(output.encode('utf-8'))) ##Write output to the flow file # end class # Define a subclass of InputStreamCallback for use in session.read() class PyInputStreamCallback(InputStreamCallback): _text = None##Just a holder for text def __init__(self): pass def getText(self) : return self._text def process(self, inputStream): reader = InputStreamReader(inputStream) ##Make Reader bufferedReader = BufferedReader(reader) ##Make input buffer self._text = bufferedReader.readLine() ## Read the file # end class
Created 02-16-2018 03:49 PM
You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)
Created 02-15-2018 03:57 PM
Why don't you use MergeContent processor to concatenate the flow-file content?
Created 02-16-2018 03:49 PM
You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)