Support Questions

Find answers, ask questions, and share your expertise

Serialize headers sent from PublishKafka_2_6

avatar
Rising Star

Hi All, 

This could be a silly question, Apologies for that. 

How do i serialize the headers sent from PublishKafka_2_6 processor ?

Currently, the processor i am using is sending headers along with the payload as a string 

 

 

ID:N1,GRANTED-ID:[root,UI1] {
...
<Payload>
...
}

 

 

 

 

we have other applications communicating with Kafka , which sends out headers along with payload as 

 

 

GRANTED-ID: ��♣sr‼java.util.ArrayListx��↔��a�♥☺I♦sizexp☻w♦☻t♦roott♦UI1x
ID: ��♣t♣N1{
...
<payload>
...
}

 

 

Is it possible for NiFi also to send across as the above format?

 

2 REPLIES 2

avatar
Expert Contributor

@scoutjohn thank you for posting your query with us. What kind of encoding / serialization format does your other application uses to produce messages to kafka ? I can see https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.27.0/org.apac... Message header encoding options with the "PublishKafka_2_6" processor, but not sure if it may be the option which you are looking for

Thanks,
Satz

avatar
Rising Star

hi @satz , thanks for the reploy the encoding was the 

ISO-8859-1

I was able to make it possible for with a groovy script 

 

def Ids = flowFile.getAttribute('TestIDs') 
def id = flowFile.getAttribute('ID') 
if (Ids != null) {
   
    def IdList = new ArrayList()
    Ids.replaceAll("[\\[\\]]", "").split(",").each { tenant ->
        IdList.add(tenant.trim()) 
    }
    headers['TestIDs'] = IdList
}


if (id != null) {
    headers['ID'] = id
}

// Serialize the headersdef serializeHeader(headerValue) {
    def byteArrayOutputStream = new ByteArrayOutputStream()
    def objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
    objectOutputStream.writeObject(headerValue) 
    objectOutputStream.flush()
    return byteArrayOutputStream.toByteArray()
}

def serializedHeaders = [:]
headers.each { key, value ->
    def serializedValue = serializeHeader(value)
    serializedHeaders[key] = new String(serializedValue, 'ISO-8859-1') 
}

// Update FlowFile with serialized headersserializedHeaders.each { key, value ->
    flowFile = session.putAttribute(flowFile, "${key}", value)
}

session.transfer(flowFile, REL_SUCCESS)