<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Serialize headers sent from PublishKafka_2_6 in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/400521#M250849</link>
    <description>&lt;P&gt;hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/27216"&gt;@satz&lt;/a&gt;&amp;nbsp;, thanks for the reploy the encoding was the&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN class="code-quote"&gt;ISO-8859-1&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;I was able to make it possible for with a groovy script&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;def Ids = flowFile.getAttribute('TestIDs') 
def id = flowFile.getAttribute('ID') 
if (Ids != null) {
   
    def IdList = new ArrayList()
    Ids.replaceAll("[\\[\\]]", "").split(",").each { tenant -&amp;gt;
        IdList.add(tenant.trim()) 
    }
    headers['TestIDs'] = IdList
}


if (id != null) {
    headers['ID'] = id
}

// Serialize the headersdef serializeHeader(headerValue) {
    def byteArrayOutputStream = new ByteArrayOutputStream()
    def objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
    objectOutputStream.writeObject(headerValue) 
    objectOutputStream.flush()
    return byteArrayOutputStream.toByteArray()
}

def serializedHeaders = [:]
headers.each { key, value -&amp;gt;
    def serializedValue = serializeHeader(value)
    serializedHeaders[key] = new String(serializedValue, 'ISO-8859-1') 
}

// Update FlowFile with serialized headersserializedHeaders.each { key, value -&amp;gt;
    flowFile = session.putAttribute(flowFile, "${key}", value)
}

session.transfer(flowFile, REL_SUCCESS)
  &lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 17 Jan 2025 09:40:27 GMT</pubDate>
    <dc:creator>scoutjohn</dc:creator>
    <dc:date>2025-01-17T09:40:27Z</dc:date>
    <item>
      <title>Serialize headers sent from PublishKafka_2_6</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/397115#M249699</link>
      <description>&lt;P&gt;Hi All,&amp;nbsp;&lt;/P&gt;&lt;P&gt;This could be a silly question, Apologies for that.&amp;nbsp;&lt;/P&gt;&lt;P&gt;How do i serialize the headers sent from&amp;nbsp;PublishKafka_2_6 processor ?&lt;/P&gt;&lt;P&gt;Currently, the processor i am using is sending headers along with the payload as a string&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;ID:N1,GRANTED-ID:[root,UI1] {
...
&amp;lt;Payload&amp;gt;
...
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;we have other applications communicating with Kafka , which sends out headers along with payload as&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;GRANTED-ID: ��♣sr‼java.util.ArrayListx��↔��a�♥☺I♦sizexp☻w♦☻t♦roott♦UI1x
ID: ��♣t♣N1{
...
&amp;lt;payload&amp;gt;
...
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Is it possible for NiFi also to send across as the above format?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 07 Nov 2024 08:14:28 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/397115#M249699</guid>
      <dc:creator>scoutjohn</dc:creator>
      <dc:date>2024-11-07T08:14:28Z</dc:date>
    </item>
    <item>
      <title>Re: Serialize headers sent from PublishKafka_2_6</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/397584#M249908</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/97154"&gt;@scoutjohn&lt;/a&gt;&amp;nbsp;thank you for posting your query with us. What kind of encoding / serialization format does your other application uses to produce messages to kafka ? I can see&amp;nbsp;&lt;A href="https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.27.0/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6/index.html" target="_blank"&gt;https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.27.0/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6/index.html&lt;/A&gt;&amp;nbsp;Message header encoding options with the "&lt;SPAN&gt;PublishKafka_2_6" processor, but not sure if it may be the option which you are looking for&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 16 Nov 2024 07:26:03 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/397584#M249908</guid>
      <dc:creator>satz</dc:creator>
      <dc:date>2024-11-16T07:26:03Z</dc:date>
    </item>
    <item>
      <title>Re: Serialize headers sent from PublishKafka_2_6</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/400521#M250849</link>
      <description>&lt;P&gt;hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/27216"&gt;@satz&lt;/a&gt;&amp;nbsp;, thanks for the reploy the encoding was the&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN class="code-quote"&gt;ISO-8859-1&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;I was able to make it possible for with a groovy script&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;def Ids = flowFile.getAttribute('TestIDs') 
def id = flowFile.getAttribute('ID') 
if (Ids != null) {
   
    def IdList = new ArrayList()
    Ids.replaceAll("[\\[\\]]", "").split(",").each { tenant -&amp;gt;
        IdList.add(tenant.trim()) 
    }
    headers['TestIDs'] = IdList
}


if (id != null) {
    headers['ID'] = id
}

// Serialize the headersdef serializeHeader(headerValue) {
    def byteArrayOutputStream = new ByteArrayOutputStream()
    def objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
    objectOutputStream.writeObject(headerValue) 
    objectOutputStream.flush()
    return byteArrayOutputStream.toByteArray()
}

def serializedHeaders = [:]
headers.each { key, value -&amp;gt;
    def serializedValue = serializeHeader(value)
    serializedHeaders[key] = new String(serializedValue, 'ISO-8859-1') 
}

// Update FlowFile with serialized headersserializedHeaders.each { key, value -&amp;gt;
    flowFile = session.putAttribute(flowFile, "${key}", value)
}

session.transfer(flowFile, REL_SUCCESS)
  &lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 17 Jan 2025 09:40:27 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Serialize-headers-sent-from-PublishKafka-2-6/m-p/400521#M250849</guid>
      <dc:creator>scoutjohn</dc:creator>
      <dc:date>2025-01-17T09:40:27Z</dc:date>
    </item>
  </channel>
</rss>

