Member since
07-18-2016
12
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
41630 | 08-31-2018 04:21 AM |
08-31-2018
04:21 AM
This is a bug in flume 1.7 version (https://issues.apache.org/jira/browse/FLUME-3027).This issue is resolved with adding code offsets.clear() in below method after consumer.commitSync method otherwise it will create problem when consumer rebalancing time. void commitOffsets() {
this.consumer.commitSync(offsets);
} Either we have to upgrade to 1.8 of flume or adding offsets.clear() code end of the method. Thanks Srinivas
... View more
08-24-2018
11:08 AM
I did not observe any error message in kafka or flume logs.But, i saw below messages in flume logs 2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-3 since its offset 395690 does not match the expected offset 394118 2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-0 since its offset 81880195 does not match the expected offset 81878380 2018-08-24 04:35:47 DEBUG Fetcher:671 - Ignoring fetched records for partition def-4 since it is no longer fetchable 2018-08-24 04:35:47 DEBUG Fetcher:671 - Ignoring fetched records for partition def-5 since it is no longer fetchable 2018-08-24 04:35:47 DEBUG Fetcher:671 - Ignoring fetched records for partition def-1 since it is no longer fetchable 2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-3 since its offset 395690 does not match the expected offset 394118 2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-0 since its offset 81880195 does not match the expected offset 81878380 2018-08-24 06:11:03 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-4 since its offset 482480 does not match the expected offset 482450 No error or exceptions apart from above messages. Sometimes, total data is reached at hive tables.But,lag will be remain same; why flume sink consumed offset is not updating at group coordinator level?
... View more
08-24-2018
11:02 AM
Hi, I have a strange problem at kafka channel topic like kafka consumer group lag( 15 lacs events) in one or two partition only.I'll give little background aboout problem: Please find the data flow into system as shown below: data ingestion ==> kafka ABC(topic of 3 parition) ==> flume source (interceptor ) ==> Kafka DEF(topic of 6 partition ) ==> Hive (parquet file) This is three node edge nodes ( kafka, flume) are running in all nodes. Meanwhile we have separate cluster where data lake is running ( HDFC,Hive, HBase, Cloudera manager,Phoenix client) component versions: Kafka ==> 0.10.1.1 Flume ==> 1.7 CDH ==> 5.14 We stopped all flume agent an hour or two hours and started data ingestion around 12lacs per min into kafka ABC topics. we stopped data ingestions into system and started all flume agents (in three nodes). Then data is comming into hive tables. after one or two mins. we observed either one or two or three paritions (assume 1,4 ,6) at DEF topic is showing as LAG is increasing constantly compare with flume source.But strangely kafka ABC (three partition) is working fine and lag is showing equally. Here those three paritions lag is increased without cleared at DEF topics and remaining parition is showing lag is zero. Kafka ABC topic data is cleared and but not at DEF topics of 1,4 and 6 partitions even though data is not coming into system. It did not cleared in next a day or two days also. We restarted kafka and flume couple of times. But sometimes one or two paritions lag is going to cleared, but not all paritions. Please find flume configurations: ------------------------------------------ agent.sources = \ flume-agent \ # Channels agent.channels = \ kafkachannel \ # Sinks agent.sinks = \ kite-sink-1 kite-sink-2 \ # Sources agent.sources.flume-agent.channels = kafkachannel agent.sources.flume-agent.type = org.apache.flume.source.kafka.KafkaSource agent.sources.flume-agent.kafka.bootstrap.servers = example1.host.com:9092,example2.host.com:9092,example3.host.com:9092 agent.sources.flume-agent.kafka.num.consumer.fetchers = 10 agent.sources.flume-agent.kafka.topics = abc agent.sources.flume-agent.interceptors = abc-interceptor abcinterceptor agent.sources.flume-agent.interceptors.abc-parameters-interceptor.type = static agent.sources.flume-agent.interceptors.abcinterceptor.type = com.abc.flume.interceptor.ABCinterceptor$Builder agent.sources.flume-agent.interceptors.abc-parameters-interceptor.key = flume.avro.schema.url agent.sources.flume-agent.interceptors.abc-parameters-interceptor.value = hdfs://myhadoop/abc.avsc agent.sources.flume-agent.interceptors.abc-parameters-interceptor.threadNum = 10 agent.sources.flume-agent.kafka.consumer.security.protocol = PLAINTEXT agent.sources.flume-agent.kafka.consumer.group.id = my-group # Sinks agent.sinks.kite-sink-1.channel = kafkachannel agent.sinks.kite-sink-1.type = org.apache.flume.sink.kite.DatasetSink agent.sinks.kite-sink-1.kite.repo.uri = repo:hive agent.sinks.kite-sink-1.kite.dataset.name = abc agent.sinks.kite-sink-1.kite.batchSize = 100000 agent.sinks.kite-sink-1.kite.rollInterval = 30 agent.sinks.kite-sink-2.channel = kafkachannel agent.sinks.kite-sink-2.type = org.apache.flume.sink.kite.DatasetSink agent.sinks.kite-sink-2.kite.repo.uri = repo:hive agent.sinks.kite-sink-2.kite.dataset.name = abc agent.sinks.kite-sink-2.kite.batchSize = 100000 agent.sinks.kite-sink-2.kite.rollInterval = 30 # Channels agent.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel agent.channels.kafkachannel.brokerList = example1.host.com:9092,example2.host.com:9092,example3.host.com:9092 agent.channels.kafkachannel.kafka.topic = def agent.channels.kafkachannel.kafka.consumer.group.id = my-group-kite agent.channels.kafkachannel.parseAsFlumeEvent = true agent.channels.kafkachannel.kafka.consumer.session.timeout.ms =60000 agent.channels.kafkachannel.kafka.consumer.request.timeout.ms=70000 agent.channels.kafkachannel.kafka.consumer.max.poll.records=100000 agent.channels.kafkachannel.kafka.num.consumer.fetchers = 10 kafka server.properties modified than defaults: zookeeper.session.timeout.ms=9000 group.max.session.timeout.ms=60000 group.min.session.timeout.ms=6000 inter.broker.protocol.version=0.10.1.1 Kindly help me. What was wrong in my configuration.
... View more
Labels:
02-05-2018
08:38 PM
Hi, I'll give little background about this. We are inserting data into hive tables using flume through Kite Sink. This is not streaming job. Flume is started using more memory than norma and suddenly we observed the below message in the flume logs. sinkRunner -pollingRunner-DefaultSinkProcessor org.apache.hadoop.hive.metastore.HiveMetaStoreClient.Open org.apache.thrift.transport.TTransportException:java.net.SocketException: Connection reset Please see more error message in below screen shot. Connection reset Why flume is connecting to hive metastore while inserting data into hive tables through kite sink? Note: This is not streaming data. Thanks Yarra
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Hive
04-10-2017
03:33 AM
Hi, We are also getting similar error like below: WARN Auto offset commit failed for group console-consumer-26249: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) We have three node cluster. If we kill one of the Kafka node then remaining two nodes hang and continuely gave above message without consume any data. If we bring up the down node again then all are trying to consume data without above warning/exception message. we are using kafka 0.10.1.1 version and linux machine. we are tried below consumer properties. But no luck. enble.auto.commit = true
auto.commit.interval.ms = 1000 zhuangmz : we can't restart the cluster in production.It is not acceptable solutions at production environment. Any specific properties to resolve this group coordination. Thanks in Adv. Thanks Yarra
... View more