Support Questions

Find answers, ask questions, and share your expertise

Processador Nifi Transform string to json from python

avatar
New Contributor

The following code what it does is to transform a string to json and then send them to kafka.

But it is sending some blank messages.

Does anyone know how I can control what is sent, that is, what comes empty not to send it to kafka.

 

This is code:

import sys
import traceback
import string
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.logging import ComponentLog
import datetime
import json

class PyStreamCallback(StreamCallback):
    def __init__(self, log):
        self.log = log

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        global error
        error = False
        messages = []
        try:
            keys = ["source_timestamp", "signal", "value"]
            fecha_envio = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
            for line in text.splitlines():
                #outputStream.write(bytearray(line.encode('utf-8')))
                line = line.strip()
                if line != "":
                    lineSplited = line.split(";")

                    if len(lineSplited) > 0:
                        message = {k: v for k,v in zip(keys, lineSplited) if v.strip() != ""}
                        if len(message) > 0:
                            message["fecha_envio"] = fecha_envio
                            messages.append(message)
            #if len(messages) > 0:
            messages = json.dumps(messages)
            outputStream.write(bytearray(messages.encode('utf-8')))

        except Exception, e:
            self.log.error("Exception in text editor: " + traceback.format_exc())
            error = True
            #outputStream.write(bytearray(text.encode('utf-8')))

class UpdateAttributes(Processor) :
    __rel_success = Relationship.Builder().description("Success").name("success").build()
    __rel_failure = Relationship.Builder().description("Failure").name("failure").build()

   
    def __init__(self) :
        pass
    def initialize(self, context) :
        self.log = context.logger
    def getRelationships(self) :
        return set([self.__rel_success, self.__rel_failure])
    def validate(self, context) :
        pass
    def setLogger(self, context) :
        pass
    def onScheduled(self, context) :
        pass
    def onStopped(self, context) :
        pass

    def getPropertyDescriptors(self) :
        pass

    def onPropertyModified(self, descriptor, newValue, oldValue) :
        pass

    def onTrigger(self, context, sessionFactory) :
        session = sessionFactory.createSession()
        try :
            global error

            # ensure there is work to do
            flowfile = session.get()
            if flowfile is None :
                return

            flowfile = session.write(flowfile,PyStreamCallback(self.log))
            # transfer
            if not error:
                session.transfer(flowfile, self.__rel_success)
            else:
                #session.transfer(flowfile, self.__rel_failure) #sends the flowfile to failure RS
                session.rollback() #Sends the flowfile to the input queue
            session.commit()
        except Exception:
            self.log.error(sys.exc_info()[0])
            self.log.error("Exception function onTrigger: " + traceback.format_exc())
            session.rollback()
            raise

processor = UpdateAttributes()



Thank you for your help
2 REPLIES 2

avatar
Super Guru

@FozzieN ,

 

Maybe if you uncomment the following if in your code it would help?

            if len(messages) > 0:
                messages = json.dumps(messages)
                outputStream.write(bytearray(messages.encode('utf-8')))

 

With that if commented out it will always send a message to Kafka even if there are no messages.

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Super Guru

@FozzieN ,

 

You can do exactly what you're trying to achieve using standard NiFi processors and a schema to describe your data structure. Please see attached a flow example that replicates what you are doing without the need for scripting. This flow uses the following schema, set as a parameter:

{
 "type": "record",
 "name": "MyData",
 "fields": [
  { "name": "source_timestamp", "type": "string" },
  { "name": "signal", "type": "string" },
  { "name": "value", "type": "string" },
  { "name": "fecha_envio", "type": ["null", "string"], "default": null }
 ]
}

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.