Support Questions

Find answers, ask questions, and share your expertise

Kafka consumer cannot receive message from producer

avatar
Rising Star

Hi all, I am practicing Kafak on my CDP 7.1.8 with Kerberos enabled.

 

I can create topics under Kerberos authentication.

 

However, when I test producing and consuming message, the consumer side never receive a message. Here are some screenshot:

 

Consumer:

 kafka-console-consumer --bootstrap-server host2.my.cloudera.lab:9092 --topic topic001  --from-beginning --cons                                                                                umer.config /root/kafka/krb-client.properties
23/04/03 04:37:00 INFO  utils.Log4jControllerRegistration$: [main]: Registered kafka:type=kafka.Log4jController MBean
23/04/03 04:37:01 INFO  consumer.ConsumerConfig: [main]: ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [host2.my.cloudera.lab:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = console-consumer
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-82044
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients                                                                                .consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SASL_PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

23/04/03 04:37:01 INFO  authenticator.AbstractLogin: [main]: Successfully logged in.
23/04/03 04:37:01 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT refresh thread sta                                                                                rted.
23/04/03 04:37:01 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT valid starting at:                                                                                 2023-04-03T02:52:45.000-0400
23/04/03 04:37:01 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT expires: 2023-04-0                                                                                4T02:52:45.000-0400
23/04/03 04:37:01 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT refresh sleeping u                                                                                ntil: 2023-04-03T22:11:03.897-0400
23/04/03 04:37:01 INFO  utils.AppInfoParser: [main]: Kafka version: 3.1.1.7.1.8.0-801
23/04/03 04:37:01 INFO  utils.AppInfoParser: [main]: Kafka commitId: 15839ba4eb998a33
23/04/03 04:37:01 INFO  utils.AppInfoParser: [main]: Kafka startTimeMs: 1680511021242
23/04/03 04:37:01 INFO  consumer.KafkaConsumer: [main]: [Consumer clientId=console-consumer, groupId=console-consumer-82044] S                                                                                ubscribed to topic(s): topic001
23/04/03 04:37:01 INFO  clients.Metadata: [main]: [Consumer clientId=console-consumer, groupId=console-consumer-82044] Resetti                                                                                ng the last seen epoch of partition topic001-0 to 0 since the associated topicId changed from null to MyVuTpA9Tfayosq_QihlwA
23/04/03 04:37:01 INFO  clients.Metadata: [main]: [Consumer clientId=console-consumer, groupId=console-consumer-82044] Cluster                                                                                 ID: 7vkx3ceERrKii_vcW_gViQ

 

Producer:

[root@host1 ~]# kafka-console-producer --broker-list host1.my.cloudera.lab:9092 host2.my.cloudera.lab:9092 --topic topic001 --                                                                                producer.config /root/kafka/krb-client.properties
23/04/03 04:37:44 INFO  utils.Log4jControllerRegistration$: [main]: Registered kafka:type=kafka.Log4jController MBean
23/04/03 04:37:44 INFO  producer.ProducerConfig: [main]: ProducerConfig values:
        acks = -1
        batch.size = 16384
        bootstrap.servers = [host1.my.cloudera.lab:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = console-producer
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 1000
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 1500
        retries = 3
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SASL_PLAINTEXT
        security.providers = null
        send.buffer.bytes = 102400
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

23/04/03 04:37:44 INFO  producer.KafkaProducer: [main]: [Producer clientId=console-producer] Instantiated an idempotent produc                                                                                er.
23/04/03 04:37:44 INFO  authenticator.AbstractLogin: [main]: Successfully logged in.
23/04/03 04:37:44 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT refresh thread sta                                                                                rted.
23/04/03 04:37:44 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT valid starting at:                                                                                 2023-04-03T02:52:45.000-0400
23/04/03 04:37:44 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT expires: 2023-04-0                                                                                4T02:52:45.000-0400
23/04/03 04:37:44 INFO  kerberos.KerberosLogin: [kafka-kerberos-refresh-thread-null]: [Principal=null]: TGT refresh sleeping u                                                                                ntil: 2023-04-03T23:06:05.063-0400
23/04/03 04:37:44 INFO  utils.AppInfoParser: [main]: Kafka version: 3.1.1.7.1.8.0-801
23/04/03 04:37:44 INFO  utils.AppInfoParser: [main]: Kafka commitId: 15839ba4eb998a33
23/04/03 04:37:44 INFO  utils.AppInfoParser: [main]: Kafka startTimeMs: 1680511064283
>23/04/03 04:37:44 INFO  clients.Metadata: [kafka-producer-network-thread | console-producer]: [Producer clientId=console-prod                                                                                ucer] Cluster ID: 7vkx3ceERrKii_vcW_gViQ
23/04/03 04:37:44 INFO  internals.TransactionManager: [kafka-producer-network-thread | console-producer]: [Producer clientId=c                                                                                onsole-producer] ProducerId set to 5 with epoch 0

23/04/03 04:37:48 INFO  clients.Metadata: [kafka-producer-network-thread | console-producer]: [Producer clientId=console-produ                                                                                cer] Resetting the last seen epoch of partition topic001-0 to 0 since the associated topicId changed from null to MyVuTpA9Tfay                                                                                osq_QihlwA
>
>hello
>world

 

Please help me out with this issue and feel free to tell me if I need to provide more information. Thank you.

1 REPLY 1

avatar
Super Collaborator

Hello @BrianChan,

We should check the consumer offset topic (__consumer_offsets) health using the Kafka describe command in such issues. 

And check min.insync.replicas setting of this topic in describe command output. It should be less than or qual to topic ISR. For example: If the topic has replication factor 3 then min ISR should be 2 or 1 for failover. 

If you found this response assisted with your query, please take a moment to log in and click on KUDOS 🙂 & ”Accept as Solution" below this post.

Thank you.