Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Attempting to Concat flowfiles for Data Transformations, [Elastic Search, Python, ExecuteScript]

avatar
New Contributor

Hey there!

BACKGROUND: I'm looking to setup a NiFi scheduler to run some data transformations. We have clickstream data and need to aggregate user data to find out which different statistics, such as which reports are most popular, and who are the power users.

SETUP: QueryElasticSearchHTTP processor is getting the click stream data, but the flow files are only 1 record at a time. These are being feed into an ExecuteScipt processor to concatenate the flowfile contents into a single flow file content. This file will be sent into another ExecuteScipt processor to run a python script that will produce the final data state.

PROBLEM: I'm getting a Transfer Relationship not specified error my ExecuteScript processor, that is concatenating flow files.

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processor.io import OutputStreamCallback
from org.apache.nifi.processor.io import InputStreamCallback
from org.apache.nifi.processor.io import StreamCallback

flowFileList = session.get(100) ##Get next 100 Rows
output = '{[' ##create what will be the new content string
ff = session.create() ##Create a new flowfile
if not flowFileList.isEmpty():  ##Check that my list isn't empty
    for flowFile in flowFileList:   
        reader = PyInputStreamCallback() ##Create Reader
        session.read(flowFile, reader) ##Readfile content
        output = output + reader.getText() ##Append content to output string       
        # Process each FlowFile here
    writer = PyOutputStreamCallback() ##Create Writer
    output = output + ']}' ## Append ending to output
    writer.setText(output) ## Set text that will be written to flowfile
    ff = session.write(ff, writer) ##Write to flowfile
    session.transfer(ff, REL_SUCCESS) ##Send flowfile to success pipe

# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
    _text = None  ##Just a holder for text

    def __init__(self):
        pass
        
    def setText(self, text) : 
        self._text = text

    def process(self, outputStream):
        outputStream.write(bytearray(output.encode('utf-8')))  ##Write output to the flow file
# end class

# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
    _text = None##Just a holder for text

    def __init__(self):
        pass

    def getText(self) : 
        return self._text

    def process(self, inputStream):
        reader = InputStreamReader(inputStream) ##Make Reader
        bufferedReader = BufferedReader(reader) ##Make input buffer
        self._text = bufferedReader.readLine()  ## Read the file
# end class



1 ACCEPTED SOLUTION

avatar
New Contributor

You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)

View solution in original post

2 REPLIES 2

avatar
Contributor

Why don't you use MergeContent processor to concatenate the flow-file content?

avatar
New Contributor

You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)