Created on 02-13-2019 05:30 AM - edited 09-16-2022 07:09 AM
Hello Everyone,
The scenario I'm trying to do is as follows:
1- Flume TAILDIR Source reading from a log file and appending a static interceptor to the beginning of the message. The interceptor consists of the host name and the host IP cause it's required with every log message I receive.
2- Flume Kafka Producer Sink that take those messages from the file and put them in a Kafka topic.
The Flume configuration is as follows:
tier1.sources=source1 tier1.channels=channel1 tier1.sinks =sink1 tier1.sources.source1.interceptors=i1 tier1.sources.source1.interceptors.i1.type=static tier1.sources.source1.interceptors.i1.key=HostData tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000### tier1.sources.source1.type=TAILDIR tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json tier1.sources.source1.filegroups=f1 tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt tier1.sources.source1.channels=channel1 tier1.channels.channel1.type=file tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint tier1.channels.channel1.dataDirs = /usr/software/flumData/data tier1.sinks.sink1.channel=channel1 tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality > tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest tier1.sinks.sink1.kafka.flumeBatchSize=20 tier1.sinks.sink1.kafka.producer.acks=0 tier1.sinks.sink1.useFlumeEventFormat=true tier1.sinks.sink1.kafka.producer.linger.ms=1 tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME tier1.sinks.sink1.kafka.producer.compression.type = snappy
So now I'm testing, I ran a Console Kafka Consumer and I started to write in the source file and I do receive the message with the header appended.
Example:
I write 'test' in the source file and press Enter then save the file
Flume detect the file change, then it sends the new line to Kafka producer.
My consumer get the following line:
###HostName###000.00.0.000###test
The issue now is that sometimes, the interceptor doesn't work as expected. It's like Flume sends 2 messages, one contains the interceptor and the other one the message content.
Example:
I write 'hi you' in the source file and press Enter then save the file
Flume detect the file change, then it sends the new line to Kafka producer.
My consumer get the following 2 line:
###HostName###000.00.0.000###
hi you
And the terminal scrolls to the the new message content.
This case always happen when I type 'hi you' in the text file, and since I read from a log file, then it's not predictable when it happens.
Help and support will be much appreciated ^^
Thank you
Created 02-19-2019 05:17 AM
Back
So far it's working fine.
I also found the problem with the writing in the file.
The garbage data between the interceptor and the message can contain literally anything. It contained \n which is the LF in Linux systems. This was causing the Kafka problem as well. Kafka see the \n and it assumes that the message is 2 messages, not 1, that's why when I changed the delimiter to \r\n it assumed the message to be 1 message. That's a good conclusion I guess.
If you want to write in a file or apply a regex on it, then just replace \n and \r with an empty string so you don't bother with those annoying control characters.
Thanks to whoever wanted to help me.
Created 02-13-2019 11:29 PM
Created 02-14-2019 12:57 AM
Created 02-14-2019 01:27 AM
So I just tested it with Streamsets,
I sent the message 'hmmmm' which Flume separates the interceptor from the message itself and I did receive 2 records:
Then I sent another message 'hi' and receive it as 1 record:
So there is a problem for sure that I didn't figure out yet.
The logs for Flume in both cases are shown below. I don't see anything weird there, what you guys think?
Logs for hmmmm
14 Feb 2019 12:20:57,991 INFO [PollableSourceRunner-TaildirSource-source1] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283) - Opening file: /usr/software/flumData/flumeStressAndKafkaFailureTest.txt, inode: 1275070426, pos: 21 14 Feb 2019 12:21:19,593 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227) - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1 14 Feb 2019 12:21:19,596 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252) - Updating checkpoint metadata: logWriteOrderID: 1550135209575, queueSize: 0, queueHead: 66366 14 Feb 2019 12:21:19,599 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1497 logWriteOrderID: 1550135209575
Logs for hi
14 Feb 2019 12:22:49,600 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227) - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1 14 Feb 2019 12:22:49,607 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252) - Updating checkpoint metadata: logWriteOrderID: 1550135209580, queueSize: 0, queueHead: 66366 14 Feb 2019 12:22:49,613 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.
file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1701 logWriteOrderID: 1550135209580
Created 02-14-2019 01:47 AM
Here is a screenshot when writing the 2 messages to a UTF8 file.
I used Notepad++ to show all symbols:
You can see the garbage characters between the Interceptor and the message
Another thing I noticed, when I configured Kafka consumer data format to be text, I receive 2 message (Interceptor + message content) but when I configure it to be Binary I only receive 1 message.
So maybe the problem is with Kafka Consumer Data Format?
Created 02-14-2019 02:34 AM
So, I did the following in Kafka Consumer element of Streamsets:
And now it works fine!
I will keep testing it and come back here in a couple of days to mark the post as a solution.
I still notice that the file that I write into has more lines than the number of records that enter it. Even if I marked that Ignore Control Characters in the Kafka element, it still happen.
Created 02-19-2019 05:17 AM
Back
So far it's working fine.
I also found the problem with the writing in the file.
The garbage data between the interceptor and the message can contain literally anything. It contained \n which is the LF in Linux systems. This was causing the Kafka problem as well. Kafka see the \n and it assumes that the message is 2 messages, not 1, that's why when I changed the delimiter to \r\n it assumed the message to be 1 message. That's a good conclusion I guess.
If you want to write in a file or apply a regex on it, then just replace \n and \r with an empty string so you don't bother with those annoying control characters.
Thanks to whoever wanted to help me.