Support Questions

Find answers, ask questions, and share your expertise

consuming from kafka topics using NiFi consumekafka processor results in multi line message being distirbuted across flowfiles

avatar
Expert Contributor

Hi,

Thanks a lot to this awesome community.

I am listening on a tcp port and sending it publishkafka processor for publishing. The original message I send is attached below

The nifi template is also shown in picture below.

When I consume using consume kafka the multi message are split. See the image.

I am using nifi1.2 and using kafka version 10

Please help

77441-nifi1.png

77442-consume1.png

Thanks a lot


originalmessage.png
1 ACCEPTED SOLUTION

avatar
@dhieru singh

Try setting the demarcator on the Consume processor.

View solution in original post

9 REPLIES 9

avatar
Expert Contributor

Is there anyone else who has faced this issue?

avatar

Sorry is not clear by the description what the issue is? It would help if you can clarify what is the problem you are facing and if there is any error stack please also add it.

avatar
Expert Contributor

@Felix Albani apologies for not making it clear.

I am publising a multi line message using publish kafka processor the message is something like below:

when i consume it using consume kafka messages are broken apart. any help Thanks

<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'><System><Provider Name='Microsoft-Windows-Security-Auditing' Guid='{54567625-5078-4554-A5BA-3E3B0328C30D}'/><EventID>4672</EventID><Version>0</Version><Level>0</Level><Task>12548</Task><Opcode>0</Opcode><Keywords>0x8020000000000000</Keywords><TimeCreated SystemTime='2008-05-23T18:33:23.071073900Z'/><EventRecordID>510676945</EventRecordID><Correlation/><Execution ProcessID='652' ThreadID='4792'/><Channel>Security</Channel><Computer>prdserver.mycomapny.org</Computer><Security/></System><EventData><Data Name='SubjectUserSid'>S-1-5-21-442726818-4567565561-3997648070-3159</Data><Data Name='SubjectUserName'>app</Data><Data Name='SubjectDomainName'>ADmycomINT</Data><Data Name='SubjectLogonId'>0abfbd2cd</Data><Data Name='PrivilegeList'>SeSecurityPrivilege
			SeBackupPrivilege
			SeRestorePrivilege
			SeTakeOwnershipPrivilege
			SeDebugPrivilege
			SeSystemEnvironmentPrivilege
			SeLoadDriverPrivilege
			SeImpersonatePrivilege</Data></EventData></Event><br>

avatar
@dhieru singh

What is the values of the property "Message Demarcator"?

avatar
Expert Contributor
@Wynner

Thanks a lot for looking at the question.

I have not set the message demaractor on both publish and consume kafka processors.

Also when I describe my kafka topics it is "Topic:test PartitionCount:1 ReplicationFactor:1"

Also the number of concurrent tasks for Consume kafka and publish kafka is 1.

Thanks a lot

Dhieru

avatar
@dhieru singh

Try setting the demarcator on the Consume processor.

avatar
Expert Contributor

@Wynner it got fixed, however when data itself contains a new line character then it is giving a problem, should I change the message demaracter from new line(shift+enter) to something else ?

avatar
@dhieru singh

You should use a demarcator that keeps your messages the way you want them.

avatar
New Contributor

Kafka Consumer Demarcator could be one solution.Use Shift+enter and try persisting into HDFS.