Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

Solved Go to solution

Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor

Hello Everyone,

 

The scenario I'm trying to do is as follows:

1- Flume TAILDIR Source reading from a log file and appending a static interceptor to the beginning of the message. The interceptor consists of the host name and the host IP cause it's required with every log message I receive.

2- Flume Kafka Producer Sink that take those messages from the file and put them in a Kafka topic.

 

The Flume configuration is as follows:

 

tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1

tier1.sources.source1.interceptors=i1


tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###


tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1

tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data



tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy

So now I'm testing, I ran a Console Kafka Consumer and I started to write in the source file and I do receive the message with the header appended.

 

Example:

I write 'test' in the source file and press Enter then save the file

Flume detect the file change, then it sends the new line to Kafka producer.

My consumer get the following line: 

###HostName###000.00.0.000###test

 

The issue now is that sometimes, the interceptor doesn't work as expected. It's like Flume sends 2 messages, one contains the interceptor and the other one the message content.

 

Example:

I write 'hi you' in the source file and press Enter then save the file

Flume detect the file change, then it sends the new line to Kafka producer.

My consumer get the following 2 line: 

###HostName###000.00.0.000###
hi you

And the terminal scrolls to the the new message content.

 

This case always happen when I type 'hi you' in the text file, and since I read from a log file, then it's not predictable when it happens.

 

Help and support will be much appreciated ^^

 

Thank you

 

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor

Back

 

So far it's working fine.

 

I also found the problem with the writing in the file.

The garbage data between the interceptor and the message can contain literally anything. It contained \n which is the LF in Linux systems. This was causing the Kafka problem as well. Kafka see the \n and it assumes that the message is 2 messages, not 1, that's why when I changed the delimiter to \r\n it assumed the message to be 1 message. That's a good conclusion I guess.

 

If you want to write in a file or apply a regex on it, then just replace \n and \r with an empty string so you don't bother with those annoying control characters.

 

Thanks to whoever wanted to help me.

 

 

6 REPLIES 6

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

Master Collaborator
Are you sure that the Kafka receives two messages? Or is it just that how the consumers displays the messages on the terminal? Kafka is key value based, so try to search in Flume logs if one or two messages were submitted to the topic.

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor
Well, in the production case, I'm using Streamsets Pipeline with a Kafka Consumer element as source to stream.

And I receive 2 messages, one is the interceptor and the other one is the message content without the interceptor. I will test it again now to make sure of that.

Sometimes I also receive the full message with the interceptor appended to it. But I notice that there are garbage characters between the interceptor and the message which I have to parse using a regex expression.

About the Flume side, no I'm not sure if flume actually sends 2 messages, can you tell me how can I check something like that? Is there some parameter in the logs configuration to set to show such a thing?

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor

 So I just tested it with Streamsets,

I sent the message 'hmmmm' which Flume separates the interceptor from the message itself and I did receive 2 records:

 

hmmmm.JPG

 

Then I sent another message 'hi' and receive it as 1 record:

 

hi.JPG

 

So there is a problem for sure that I didn't figure out yet.

 

The logs for Flume in both cases are shown below. I don't see anything weird there, what you guys think?

 

Logs for hmmmm

 

14 Feb 2019 12:20:57,991 INFO  [PollableSourceRunner-TaildirSource-source1] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283)  - Opening file: /usr/software/flumData/flumeStressAndKafkaFailureTest.txt, inode: 1275070426, pos: 21
14 Feb 2019 12:21:19,593 INFO  [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227)  - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1
14 Feb 2019 12:21:19,596 INFO  [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252)  - Updating checkpoint metadata: logWriteOrderID: 1550135209575, queueSize: 0, queueHead: 66366
14 Feb 2019 12:21:19,599 INFO  [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.Log.writeCheckpoint:1052)  - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1497 logWriteOrderID: 1550135209575

 

 

Logs for hi

14 Feb 2019 12:22:49,600 INFO  [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227)  - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1
14 Feb 2019 12:22:49,607 INFO  [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252)  - Updating checkpoint metadata: logWriteOrderID: 1550135209580, queueSize: 0, queueHead: 66366
14 Feb 2019 12:22:49,613 INFO  [Log-BackgroundWorker-channel1] (org.apache.flume.channel.
file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1701 logWriteOrderID: 1550135209580

 

Highlighted

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor

Here is a screenshot when writing the 2 messages to a UTF8 file.

I used Notepad++ to show all symbols:

 

You can see the garbage characters between the Interceptor and the message

 

RealOutput.JPG

 

 

Another thing I noticed, when I configured Kafka consumer data format to be text, I receive 2 message (Interceptor + message content) but when I configure it to be Binary I only receive 1 message.

 

So maybe the problem is with Kafka Consumer Data Format?

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor

So, I did the following in Kafka Consumer element of Streamsets:

 

KafkaConsumerConfig.JPG

 

 

 

And now it works fine!

 

I will keep testing it and come back here in a couple of days to mark the post as a solution.

 

I still notice that the file that I write into has more lines than the number of records that enter it. Even if I marked that Ignore Control Characters in the Kafka element, it still happen.

Re: Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue

New Contributor

Back

 

So far it's working fine.

 

I also found the problem with the writing in the file.

The garbage data between the interceptor and the message can contain literally anything. It contained \n which is the LF in Linux systems. This was causing the Kafka problem as well. Kafka see the \n and it assumes that the message is 2 messages, not 1, that's why when I changed the delimiter to \r\n it assumed the message to be 1 message. That's a good conclusion I guess.

 

If you want to write in a file or apply a regex on it, then just replace \n and \r with an empty string so you don't bother with those annoying control characters.

 

Thanks to whoever wanted to help me.

 

 

Don't have an account?
Coming from Hortonworks? Activate your account here