Member since
02-13-2019
12
Posts
2
Kudos Received
1
Solution
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 6870 | 02-19-2019 05:17 AM |
02-19-2019
05:17 AM
Back So far it's working fine. I also found the problem with the writing in the file. The garbage data between the interceptor and the message can contain literally anything. It contained \n which is the LF in Linux systems. This was causing the Kafka problem as well. Kafka see the \n and it assumes that the message is 2 messages, not 1, that's why when I changed the delimiter to \r\n it assumed the message to be 1 message. That's a good conclusion I guess. If you want to write in a file or apply a regex on it, then just replace \n and \r with an empty string so you don't bother with those annoying control characters. Thanks to whoever wanted to help me.
... View more
02-14-2019
02:34 AM
1 Kudo
So, I did the following in Kafka Consumer element of Streamsets: And now it works fine! I will keep testing it and come back here in a couple of days to mark the post as a solution. I still notice that the file that I write into has more lines than the number of records that enter it. Even if I marked that Ignore Control Characters in the Kafka element, it still happen.
... View more
02-14-2019
01:47 AM
Here is a screenshot when writing the 2 messages to a UTF8 file. I used Notepad++ to show all symbols: You can see the garbage characters between the Interceptor and the message Another thing I noticed, when I configured Kafka consumer data format to be text, I receive 2 message (Interceptor + message content) but when I configure it to be Binary I only receive 1 message. So maybe the problem is with Kafka Consumer Data Format?
... View more
02-14-2019
01:27 AM
So I just tested it with Streamsets, I sent the message 'hmmmm' which Flume separates the interceptor from the message itself and I did receive 2 records: Then I sent another message 'hi' and receive it as 1 record: So there is a problem for sure that I didn't figure out yet. The logs for Flume in both cases are shown below. I don't see anything weird there, what you guys think? Logs for hmmmm 14 Feb 2019 12:20:57,991 INFO [PollableSourceRunner-TaildirSource-source1] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283) - Opening file: /usr/software/flumData/flumeStressAndKafkaFailureTest.txt, inode: 1275070426, pos: 21
14 Feb 2019 12:21:19,593 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227) - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1
14 Feb 2019 12:21:19,596 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252) - Updating checkpoint metadata: logWriteOrderID: 1550135209575, queueSize: 0, queueHead: 66366
14 Feb 2019 12:21:19,599 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1497 logWriteOrderID: 1550135209575
Logs for hi 14 Feb 2019 12:22:49,600 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227) - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1
14 Feb 2019 12:22:49,607 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252) - Updating checkpoint metadata: logWriteOrderID: 1550135209580, queueSize: 0, queueHead: 66366
14 Feb 2019 12:22:49,613 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel. file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1701 logWriteOrderID: 1550135209580
... View more
02-14-2019
12:57 AM
Well, in the production case, I'm using Streamsets Pipeline with a Kafka Consumer element as source to stream. And I receive 2 messages, one is the interceptor and the other one is the message content without the interceptor. I will test it again now to make sure of that. Sometimes I also receive the full message with the interceptor appended to it. But I notice that there are garbage characters between the interceptor and the message which I have to parse using a regex expression. About the Flume side, no I'm not sure if flume actually sends 2 messages, can you tell me how can I check something like that? Is there some parameter in the logs configuration to set to show such a thing?
... View more
02-13-2019
05:30 AM
Hello Everyone, The scenario I'm trying to do is as follows: 1- Flume TAILDIR Source reading from a log file and appending a static interceptor to the beginning of the message. The interceptor consists of the host name and the host IP cause it's required with every log message I receive. 2- Flume Kafka Producer Sink that take those messages from the file and put them in a Kafka topic. The Flume configuration is as follows: tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1
tier1.sources.source1.interceptors=i1
tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###
tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1
tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy
So now I'm testing, I ran a Console Kafka Consumer and I started to write in the source file and I do receive the message with the header appended. Example: I write 'test' in the source file and press Enter then save the file Flume detect the file change, then it sends the new line to Kafka producer. My consumer get the following line: ###HostName###000.00.0.000###test The issue now is that sometimes, the interceptor doesn't work as expected. It's like Flume sends 2 messages, one contains the interceptor and the other one the message content. Example: I write 'hi you' in the source file and press Enter then save the file Flume detect the file change, then it sends the new line to Kafka producer. My consumer get the following 2 line: ###HostName###000.00.0.000### hi you And the terminal scrolls to the the new message content. This case always happen when I type 'hi you' in the text file, and since I read from a log file, then it's not predictable when it happens. Help and support will be much appreciated ^^ Thank you
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka