Created on 08-25-2022 05:30 PM - edited 08-25-2022 06:05 PM
The following code what it does is to transform a string to json and then send them to kafka.
But it is sending some blank messages.
Does anyone know how I can control what is sent, that is, what comes empty not to send it to kafka.
This is code:
import sys
import traceback
import string
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.logging import ComponentLog
import datetime
import json
class PyStreamCallback(StreamCallback):
def __init__(self, log):
self.log = log
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
global error
error = False
messages = []
try:
keys = ["source_timestamp", "signal", "value"]
fecha_envio = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
for line in text.splitlines():
#outputStream.write(bytearray(line.encode('utf-8')))
line = line.strip()
if line != "":
lineSplited = line.split(";")
if len(lineSplited) > 0:
message = {k: v for k,v in zip(keys, lineSplited) if v.strip() != ""}
if len(message) > 0:
message["fecha_envio"] = fecha_envio
messages.append(message)
#if len(messages) > 0:
messages = json.dumps(messages)
outputStream.write(bytearray(messages.encode('utf-8')))
except Exception, e:
self.log.error("Exception in text editor: " + traceback.format_exc())
error = True
#outputStream.write(bytearray(text.encode('utf-8')))
class UpdateAttributes(Processor) :
__rel_success = Relationship.Builder().description("Success").name("success").build()
__rel_failure = Relationship.Builder().description("Failure").name("failure").build()
def __init__(self) :
pass
def initialize(self, context) :
self.log = context.logger
def getRelationships(self) :
return set([self.__rel_success, self.__rel_failure])
def validate(self, context) :
pass
def setLogger(self, context) :
pass
def onScheduled(self, context) :
pass
def onStopped(self, context) :
pass
def getPropertyDescriptors(self) :
pass
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
session = sessionFactory.createSession()
try :
global error
# ensure there is work to do
flowfile = session.get()
if flowfile is None :
return
flowfile = session.write(flowfile,PyStreamCallback(self.log))
# transfer
if not error:
session.transfer(flowfile, self.__rel_success)
else:
#session.transfer(flowfile, self.__rel_failure) #sends the flowfile to failure RS
session.rollback() #Sends the flowfile to the input queue
session.commit()
except Exception:
self.log.error(sys.exc_info()[0])
self.log.error("Exception function onTrigger: " + traceback.format_exc())
session.rollback()
raise
processor = UpdateAttributes()
Thank you for your help
Created 08-26-2022 07:06 PM
@FozzieN ,
Maybe if you uncomment the following if in your code it would help?
if len(messages) > 0:
messages = json.dumps(messages)
outputStream.write(bytearray(messages.encode('utf-8')))
With that if commented out it will always send a message to Kafka even if there are no messages.
Cheers,
André
Created 08-26-2022 07:27 PM
@FozzieN ,
You can do exactly what you're trying to achieve using standard NiFi processors and a schema to describe your data structure. Please see attached a flow example that replicates what you are doing without the need for scripting. This flow uses the following schema, set as a parameter:
{
"type": "record",
"name": "MyData",
"fields": [
{ "name": "source_timestamp", "type": "string" },
{ "name": "signal", "type": "string" },
{ "name": "value", "type": "string" },
{ "name": "fecha_envio", "type": ["null", "string"], "default": null }
]
}
Cheers,
André