Member since
12-06-2018
3
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
780 | 09-05-2019 05:55 AM |
09-14-2020
12:49 AM
I have put detailed solution description here You should be able to start NiFi & its registry with this docker compose file. It persists several folders on the local disk & rest will be persisted in the docker volume called 'nifi_data' so that it retains the previous state of other folders like content_repository, database_repository, flowfile_repository, provenance_repository, state and work.
... View more
09-05-2019
05:55 AM
A ExecuteScript for those who also need to do this and get valid JSON back: import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
def remove_controls(text):
out = re.sub('\n', '', text)
out = re.sub('\r', '', out)
return out
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
for atr in dict(attrs).keys():
try:
sub = attrs[atr].encode('ascii', 'ignore')
sub = remove_controls(sub)
except:
sub = attrs[atr]
line = re.sub('\${' + atr + '}', sub, line)
output.append(line.rstrip("\n"))
try:
json.dumps(output)
except:
raise Exception("COULD NOT POST INVALID JSON")
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
... View more