Member since
Kudos Received
05:30 PM
The following code what it does is to transform a string to json and then send them to kafka. But it is sending some blank messages. Does anyone know how I can control what is sent, that is, what comes empty not to send it to kafka. This is code: import sys import traceback import string import from import IOUtils from java.nio.charset import StandardCharsets from import StreamCallback from org.apache.nifi.processor import Processor from org.apache.nifi.processor import Relationship from org.apache.nifi.components import PropertyDescriptor from org.apache.nifi.processor.util import StandardValidators from org.apache.nifi.logging import ComponentLog import datetime import json class PyStreamCallback(StreamCallback): def __init__(self, log): self.log = log def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) global error error = False messages = [] try: keys = ["source_timestamp", "signal", "value"] fecha_envio ="%Y-%m-%d %H:%M:%S.%f") for line in text.splitlines(): #outputStream.write(bytearray(line.encode('utf-8'))) line = line.strip() if line != "": lineSplited = line.split(";") if len(lineSplited) > 0: message = {k: v for k,v in zip(keys, lineSplited) if v.strip() != ""} if len(message) > 0: message["fecha_envio"] = fecha_envio messages.append(message) #if len(messages) > 0: messages = json.dumps(messages) outputStream.write(bytearray(messages.encode('utf-8'))) except Exception, e: self.log.error("Exception in text editor: " + traceback.format_exc()) error = True #outputStream.write(bytearray(text.encode('utf-8'))) class UpdateAttributes(Processor) : __rel_success = Relationship.Builder().description("Success").name("success").build() __rel_failure = Relationship.Builder().description("Failure").name("failure").build() def __init__(self) : pass def initialize(self, context) : self.log = context.logger def getRelationships(self) : return set([self.__rel_success, self.__rel_failure]) def validate(self, context) : pass def setLogger(self, context) : pass def onScheduled(self, context) : pass def onStopped(self, context) : pass def getPropertyDescriptors(self) : pass def onPropertyModified(self, descriptor, newValue, oldValue) : pass def onTrigger(self, context, sessionFactory) : session = sessionFactory.createSession() try : global error # ensure there is work to do flowfile = session.get() if flowfile is None : return flowfile = session.write(flowfile,PyStreamCallback(self.log)) # transfer if not error: session.transfer(flowfile, self.__rel_success) else: #session.transfer(flowfile, self.__rel_failure) #sends the flowfile to failure RS session.rollback() #Sends the flowfile to the input queue session.commit() except Exception: self.log.error(sys.exc_info()[0]) self.log.error("Exception function onTrigger: " + traceback.format_exc()) session.rollback() raise processor = UpdateAttributes() Thank you for your help
... View more
- Labels:
Apache Kafka
Apache NiFi