Member since
02-14-2018
3
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2440 | 02-16-2018 03:49 PM |
02-16-2018
03:49 PM
You have to add a flowfile transfer for the old flowfiles. session.transfer(flowfilelist, REL_FAILURE)
... View more
02-15-2018
12:24 PM
Hey there! BACKGROUND: I'm looking to setup a NiFi scheduler to run some data transformations. We have clickstream data and need to aggregate user data to find out which different statistics, such as which reports are most popular, and who are the power users. SETUP: QueryElasticSearchHTTP processor is getting the click stream data, but the flow files are only 1 record at a time. These are being feed into an ExecuteScipt processor to concatenate the flowfile contents into a single flow file content. This file will be sent into another ExecuteScipt processor to run a python script that will produce the final data state. PROBLEM: I'm getting a Transfer Relationship not specified error my ExecuteScript processor, that is concatenating flow files. from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processor.io import OutputStreamCallback
from org.apache.nifi.processor.io import InputStreamCallback
from org.apache.nifi.processor.io import StreamCallback
flowFileList = session.get(100) ##Get next 100 Rows
output = '{[' ##create what will be the new content string
ff = session.create() ##Create a new flowfile
if not flowFileList.isEmpty(): ##Check that my list isn't empty
for flowFile in flowFileList:
reader = PyInputStreamCallback() ##Create Reader
session.read(flowFile, reader) ##Readfile content
output = output + reader.getText() ##Append content to output string
# Process each FlowFile here
writer = PyOutputStreamCallback() ##Create Writer
output = output + ']}' ## Append ending to output
writer.setText(output) ## Set text that will be written to flowfile
ff = session.write(ff, writer) ##Write to flowfile
session.transfer(ff, REL_SUCCESS) ##Send flowfile to success pipe
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
_text = None ##Just a holder for text
def __init__(self):
pass
def setText(self, text) :
self._text = text
def process(self, outputStream):
outputStream.write(bytearray(output.encode('utf-8'))) ##Write output to the flow file
# end class
# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
_text = None##Just a holder for text
def __init__(self):
pass
def getText(self) :
return self._text
def process(self, inputStream):
reader = InputStreamReader(inputStream) ##Make Reader
bufferedReader = BufferedReader(reader) ##Make input buffer
self._text = bufferedReader.readLine() ## Read the file
# end class
... View more
Labels:
- Labels:
-
Apache NiFi