Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Unable to consume Kafka messages (Cloudera 6)

Unable to consume Kafka messages (Cloudera 6)

Explorer

I am using the newly release Cloudera 6.  I have a Kafka node with Zookeeper setup.

 

I am able to produce messages, but unable to consume messages.

 

Here is how I am producing messages:

 

$ kafka-console-producer --batch-size 1 --broker-list <hostname>:9092 --topic TEST
...
ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [<hostname>:9092] buffer.memory = 33554432 client.id = console-producer compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 1000 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 1500 retries = 3 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 18/09/20 19:18:56 INFO utils.AppInfoParser: Kafka version : 1.0.1-cdh6.0.0 18/09/20 19:18:56 INFO utils.AppInfoParser: Kafka commitId : unknown >TEST MESSAGE >TEST MESSAGE 2 >

 

 

Trying to consum messages:

$ kafka-console-consumer --bootstrap-server <hostname>:9092 --topic TEST --from-beginning
....

18/09/20 19:18:07 INFO consumer.ConsumerConfig: ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [hostname:9092]
    check.crcs = true
    client.id =
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = console-consumer-77498
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

18/09/20 19:18:07 INFO utils.AppInfoParser: Kafka version : 1.0.1-cdh6.0.0
18/09/20 19:18:07 INFO utils.AppInfoParser: Kafka commitId : unknown

No messages appear to be consumed (expectation is that they would print out above).

 

We have the same behavior in our Java Spring application.

 

 

4 REPLIES 4
Highlighted

Re: Unable to consume Kafka messages (Cloudera 6)

Explorer
I should mention I tried flipping this flag to false but got the same results. This is the only thing that looked like it might be necessary.

exclude.internal.topics = false

Re: Unable to consume Kafka messages (Cloudera 6)

New Contributor

I am also facing the same issue, producer is able to produce messages, but consumer not able to consume

kafka version is 4.0

 

Re: Unable to consume Kafka messages (Cloudera 6)

Super Collaborator
How many brokers have you configured? If it is less then 3, then you need to make sure that your offsets.topic.replication.factor is reduced to match. If that isn't the problem, there should be some indication in the broker logs of what the issue is.

-pd

Re: Unable to consume Kafka messages (Cloudera 6)

New Contributor

Thank you @pdvorak :)

This resolved my issue, i had setup a 2 broker configuration to test a sample producer-consumer, by deault the offsets.topic.replication.factor was set to "3" so changing it to the no.of brokers resolved the issue.

@stanton009 Check on this ,it may be the same issue.