Support Questions

Find answers, ask questions, and share your expertise
Celebrating a Century of Connection: Our Community Reaches 100,000 Members! Thank you!

Attempting to Concat flowfiles for Data Transformations, [Elastic Search, Python, ExecuteScript]

New Contributor

Hey there!

BACKGROUND: I'm looking to setup a NiFi scheduler to run some data transformations. We have clickstream data and need to aggregate user data to find out which different statistics, such as which reports are most popular, and who are the power users.

SETUP: QueryElasticSearchHTTP processor is getting the click stream data, but the flow files are only 1 record at a time. These are being feed into an ExecuteScipt processor to concatenate the flowfile contents into a single flow file content. This file will be sent into another ExecuteScipt processor to run a python script that will produce the final data state.

PROBLEM: I'm getting a Transfer Relationship not specified error my ExecuteScript processor, that is concatenating flow files.

from import IOUtils
from java.nio.charset import StandardCharsets
from import BufferedReader, InputStreamReader
from import OutputStreamCallback
from import InputStreamCallback
from import StreamCallback

flowFileList = session.get(100) ##Get next 100 Rows
output = '{[' ##create what will be the new content string
ff = session.create() ##Create a new flowfile
if not flowFileList.isEmpty():  ##Check that my list isn't empty
    for flowFile in flowFileList:   
        reader = PyInputStreamCallback() ##Create Reader, reader) ##Readfile content
        output = output + reader.getText() ##Append content to output string       
        # Process each FlowFile here
    writer = PyOutputStreamCallback() ##Create Writer
    output = output + ']}' ## Append ending to output
    writer.setText(output) ## Set text that will be written to flowfile
    ff = session.write(ff, writer) ##Write to flowfile
    session.transfer(ff, REL_SUCCESS) ##Send flowfile to success pipe

# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
    _text = None  ##Just a holder for text

    def __init__(self):
    def setText(self, text) : 
        self._text = text

    def process(self, outputStream):
        outputStream.write(bytearray(output.encode('utf-8')))  ##Write output to the flow file
# end class

# Define a subclass of InputStreamCallback for use in
class PyInputStreamCallback(InputStreamCallback):
    _text = None##Just a holder for text

    def __init__(self):

    def getText(self) : 
        return self._text

    def process(self, inputStream):
        reader = InputStreamReader(inputStream) ##Make Reader
        bufferedReader = BufferedReader(reader) ##Make input buffer
        self._text = bufferedReader.readLine()  ## Read the file
# end class


New Contributor

You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)

View solution in original post



Why don't you use MergeContent processor to concatenate the flow-file content?

New Contributor

You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)