Created on 11-06-2024 11:07 PM - edited 11-07-2024 12:14 AM
Hi All,
This could be a silly question, Apologies for that.
How do i serialize the headers sent from PublishKafka_2_6 processor ?
Currently, the processor i am using is sending headers along with the payload as a string
ID:N1,GRANTED-ID:[root,UI1] {
...
<Payload>
...
}
we have other applications communicating with Kafka , which sends out headers along with payload as
GRANTED-ID: ��♣sr‼java.util.ArrayListx��↔��a�♥☺I♦sizexp☻w♦☻t♦roott♦UI1x
ID: ��♣t♣N1{
...
<payload>
...
}
Is it possible for NiFi also to send across as the above format?
Created 11-15-2024 11:26 PM
@scoutjohn thank you for posting your query with us. What kind of encoding / serialization format does your other application uses to produce messages to kafka ? I can see https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.27.0/org.apac... Message header encoding options with the "PublishKafka_2_6" processor, but not sure if it may be the option which you are looking for
Created on 01-17-2025 01:39 AM - edited 01-17-2025 01:40 AM
hi @satz , thanks for the reploy the encoding was the
ISO-8859-1
I was able to make it possible for with a groovy script
def Ids = flowFile.getAttribute('TestIDs')
def id = flowFile.getAttribute('ID')
if (Ids != null) {
def IdList = new ArrayList()
Ids.replaceAll("[\\[\\]]", "").split(",").each { tenant ->
IdList.add(tenant.trim())
}
headers['TestIDs'] = IdList
}
if (id != null) {
headers['ID'] = id
}
// Serialize the headersdef serializeHeader(headerValue) {
def byteArrayOutputStream = new ByteArrayOutputStream()
def objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
objectOutputStream.writeObject(headerValue)
objectOutputStream.flush()
return byteArrayOutputStream.toByteArray()
}
def serializedHeaders = [:]
headers.each { key, value ->
def serializedValue = serializeHeader(value)
serializedHeaders[key] = new String(serializedValue, 'ISO-8859-1')
}
// Update FlowFile with serialized headersserializedHeaders.each { key, value ->
flowFile = session.putAttribute(flowFile, "${key}", value)
}
session.transfer(flowFile, REL_SUCCESS)