Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume — Kakfa integration , flime sync with kafka

Highlighted

Flume — Kakfa integration , flime sync with kafka

New Contributor

Hi All ,

I have spark application to process log file. Data flow is something like this..

Log File --> Flume --> kafka --> Spark --(After Process) --> hive

Everything is working properly , expect the the flume(I guess so..). I am not getting continuous update from my log file in kafka consumer.

to start the kakfa consumer I use the command ...

kafka-console-consumer.sh --zookeeper hadoopmaster:2181 --bootstrap-server hadoopmaster:9092 --topic memoryChannel --from-beginning

But the strange thing is that when I start the flume agent 1st time I am getting all the messages in Kakfa consumer , that I have in log file , but if after that I open the log file and some more line and save it , those updated lines are not getting propagated in kafka consumer as well as spark.

I start the flume agent with ...

/home/hduser/flume/flume/bin/flume-ng agent -c /home/hduser/flume/flume/conf -f /home/hduser/flume/flume_kafka_sync.conf -n agent

No error in flume log file.

My Flume conf file is something like this ...

agent.sources  = source1
agent.channels = channel1
agent.sinks = sink1

agent.sources.source1.type = exec
agent.sources.source1.command = tail -f /home/hduser/test.txt
agent.sources.source1.channels = channel1
agent.sources.source1.interceptors = itime
agent.sources.source1.interceptors.itime.type = timestamp

agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 10000
agent.channels.channel1.transactionCapacity = 1000

agent.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sink1.channel = channel1
agent.sinks.sink1.kafka.bootstrap.servers=hadoopmaster:9092,hadoopslave1:9092
agent.sinks.sink1.kafka.topic = memoryChannel
agent.sinks.sink1.batchsize = 200
agent.sinks.sink1.producer.type = async
agent.sinks.sink1.serializer.class = kafka.serializer.StringEncoder

Anything I am missing in the configuration to get the continuous update ? My Testing process is that I just open the test.txt and add some line and save it. Any help will be appreciated ...

2 REPLIES 2

Re: Flume — Kakfa integration , flime sync with kafka

Contributor

@Biswajit Chakraborty Can you try using tail command with -F option rather than -f?

Re: Flume — Kakfa integration , flime sync with kafka

Contributor

@Biswajit Chakraborty Did you get a chance to try this? Please update.

Don't have an account?
Coming from Hortonworks? Activate your account here