Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

KAFKA SSL ENCRYPTION - A QUESTION...

KAFKA SSL ENCRYPTION - A QUESTION...

New Contributor

Hello all, I have been following guide by Cloudera: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html


All is generated on machine kafka-0, the same I have repeated on kafka-1 and kafka-2 (brokers). The only difference is the name - instead of localhost I was giving kafka-0 / kafka-1 / kafka-2 (seems logical of course). Test for SSL returned proper value. I am talking about this command:

openssl s_client -debug -connect localhost:9093 -tls1

 Now the big issue: Why the heck I cant receive any message from console producer on kafka-0, to for example console-consumer on the same machine (kafka-0). I have been able to do it while they were not secured... Now I am unable to do it...

 

I have configured SSL as security.inter.broker.protocol... All I want to see are messages - and yes on this same machine before I will even check on the others, just to rule out any certification misconfiguration...

 

Hope my question is clear enough and someone can assist me....

 

Thanks!

 

8 REPLIES 8

Re: KAFKA SSL ENCRYPTION - A QUESTION...

Super Collaborator
Can you please provide the console consumer and console producer commands that you are running? Are you specifying a --consumer.config and --producer.config with the relevant properties?

You can add debug to the producer to confirm if it is sending or if there is a 'retryable' error that is preventing it from working:

cp /etc/kafka/conf/tools-log4j.properties /var/tmp
sed -i -e 's/WARN/DEBUG/g' /var/tmp/tools-log4j.properties
export KAFKA_OPTS="-Dlog4j.configuration=file:/var/tmp/tools-log4j.properties -Djava.security.auth.login.config=/home/user/jaas.conf"

-pd

Re: KAFKA SSL ENCRYPTION - A QUESTION...

New Contributor

Thanks for answer. I am running pretty standard commands by logging on into the machine and executing:

kafka-console-producer --broker-list kafka0:9092 --topic KT1

kafka-console-consumer --zookeeper kafka0:2181 --topic KT1

Thats basically it... The machines are part of my normal cluster installation and if I will disable SSL it works normally...

 

Everything is configured by Cloudera so I dont use any other config... Any other deeper logging option does not tell me anything, basically there is no error at all... I am executing these commands on the same machine to rule-out any certificate issues, as while everything was done according to the article I have mentioned it should be working (at least for this machine)...

 

I will be glad to provide any other details if required.

 

 

Re: KAFKA SSL ENCRYPTION - A QUESTION...

Super Collaborator
In order to get this to work with SSL encryption, you need to follow the steps for the producer and consumer here: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html

If you are not using kerberos, then you don't need to specify a jaas.conf, but you will need to have a client.properties file that has something similar to the following:
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234

(See Step 5 in the docs)

Ensure that you are running the clients with the same command line options as specified in the documentation.

-pd

Re: KAFKA SSL ENCRYPTION - A QUESTION...

Super Collaborator
Also, one more note, if you are using SSL on port 9093, then the broker list must contain that port number and not the default 9092. Your producer command should look something like this:

kafka-console-producer --broker-list kafka0:9093 --topic KT1 --producer.config client.properties

-pd
Highlighted

Re: KAFKA SSL ENCRYPTION - A QUESTION...

New Contributor

OK, it seems that I am getting somewhere (but this is what I once said before hitting the wall with my car):

1. Lets assume I have run all the required commands from the article, configuring the relevant certificates. This should be fine, as for 1 machine its not really a tough job.

2. I have enabled SSL in Cloudera in KAFKA -> Security settings, configured all paths accordingly. This is also pretty straight forward.

Now, This is the command I am running:

kafka-console-producer --broker-list kafka0:9093  --topic KT0X --producer.config config.properties

Content of config. properties:

security.protocol=SSL
ssl.truststore.location=PATH_TO_SERVER_KEYSTORE
ssl.truststore.password=PASSWORD

 Error:

17/03/18 04:53:30 WARN clients.NetworkClient: Error while fetching metadata with correlation id 6 : {KT0X=LEADER_NOT_AVAILABLE} I have tried to find solution for this error but none was working...

 

Any idea? I dare not to start consumer yet....

 

 

Re: KAFKA SSL ENCRYPTION - A QUESTION...

Super Collaborator
The response you got now indicates you are communicating with the cluster, can you run a:
kafka-topics --describe --zookeeper kafka0:2181

And confirm for the topic you are trying to produce to, if the ISR is populated?

-pd

Re: KAFKA SSL ENCRYPTION - A QUESTION...

New Contributor

I have checked it already and thats the output:

Topic:KT0X     PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: TEKX     Partition: 0    Leader: 23      Replicas: 23,24,22      Isr: 23,24,22
        Topic: TEKX     Partition: 1    Leader: 24      Replicas: 24,22,23      Isr: 24,22,23
        Topic: TEKX     Partition: 2    Leader: 22      Replicas: 22,23,24      Isr: 22,23,24

Re: KAFKA SSL ENCRYPTION - A QUESTION...

New Contributor

I have checked it already and thats the output:

Topic:KTO       PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: KTO      Partition: 0    Leader: 23      Replicas: 24,23,22      Isr: 23
        Topic: KTO      Partition: 1    Leader: 23      Replicas: 22,24,23      Isr: 23
        Topic: KTO      Partition: 2    Leader: 23      Replicas: 23,22,24      Isr: 23

 

kafka0 machine id is: 23. Error still persist... Will investigate further as well...

 

 I decided to include log from initiating command in Debug mode (You have suggested previously), what is weird even when in Cloudera ID is 23 - here it is -1 (this could be the problem).

[andre@kafka0 ~]# kafka-console-producer --broker-list kafka0:9093  --topic KTO --producer.config config.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/03/18 05:43:28 INFO producer.ProducerConfig: ProducerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [kafka0:9093]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 60000
        interceptor.classes = null
        ssl.truststore.password = [hidden]
        client.id = console-producer
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 1500
        acks = 0
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 3
        ssl.truststore.location = /home/KFSEC/server.keystore.jks
        ssl.keystore.password = null
        send.buffer.bytes = 102400
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = SSL
        max.request.size = 1048576
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 1000

17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name bufferpool-wait-time
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name buffer-exhausted-records
17/03/18 05:43:28 DEBUG clients.Metadata: Updated cluster metadata version 1 to Cluster(nodes = [kafka0:9093 (id: -1 rack: null)], partitions = [])
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name connections-closed:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name connections-created:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name bytes-sent-received:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name bytes-sent:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name bytes-received:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name select-time:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name io-time:
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name batch-size
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name compression-rate
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name queue-time
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name request-time
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name produce-throttle-time
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name records-per-request
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name record-retries
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name errors
17/03/18 05:43:28 DEBUG metrics.Metrics: Added sensor with name record-size-max
17/03/18 05:43:28 INFO producer.ProducerConfig: ProducerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [kafka0:9093]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 60000
        interceptor.classes = null
        ssl.truststore.password = [hidden]
        client.id = console-producer
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 1500
        acks = 0
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 3
        ssl.truststore.location = /home/KFSEC/server.keystore.jks
        ssl.keystore.password = null
        send.buffer.bytes = 102400
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = SSL
        max.request.size = 1048576
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 1000

17/03/18 05:43:28 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
17/03/18 05:43:28 INFO utils.AppInfoParser: Kafka commitId : unknown
17/03/18 05:43:28 DEBUG producer.KafkaProducer: Kafka producer started
17/03/18 05:43:28 DEBUG internals.Sender: Starting Kafka producer I/O thread.
asdf
17/03/18 05:43:32 DEBUG clients.NetworkClient: Initialize connection to node -1 for sending metadata request
17/03/18 05:43:32 DEBUG clients.NetworkClient: Initiating connection to node -1 at kafka0:9093.
17/03/18 05:43:32 DEBUG metrics.Metrics: Added sensor with name node--1.bytes-sent
17/03/18 05:43:32 DEBUG metrics.Metrics: Added sensor with name node--1.bytes-received
17/03/18 05:43:32 DEBUG metrics.Metrics: Added sensor with name node--1.latency
17/03/18 05:43:32 DEBUG clients.NetworkClient: Completed connection to node -1
17/03/18 05:43:32 DEBUG clients.NetworkClient: Sending metadata request {topics=[KTO]} to node -1
17/03/18 05:43:32 WARN clients.NetworkClient: Error while fetching metadata with correlation id 0 : {KTO=LEADER_NOT_AVAILABLE}
17/03/18 05:43:32 DEBUG clients.NetworkClient: Sending metadata request {topics=[KTO]} to node -1
17/03/18 05:43:32 WARN clients.NetworkClient: Error while fetching metadata with correlation id 1 : {KTO=LEADER_NOT_AVAILABLE}
17/03/18 05:43:32 DEBUG clients.NetworkClient: Sending metadata request {topics=[KTO]} to node -1
17/03/18 05:43:32 WARN clients.NetworkClient: Error while fetching metadata with correlation id 2 : {KTO=LEADER_NOT_AVAILABLE}
17/03/18 05:43:32 DEBUG clients.NetworkClient: Sending metadata request {topics=[KTO]} to node -1
17/03/18 05:43:33 WARN clients.NetworkClient: Error while fetching metadata with correlation id 3 : {KTO=LEADER_NOT_AVAILABLE}
17/03/18 05:43:33 DEBUG clients.NetworkClient: Sending metadata request {topics=[KTO]} to node -1
17/03/18 05:43:33 WARN clients.NetworkClient: Error while fetching metadata with correlation id 4 : {KTO=LEADER_NOT_AVAILABLE}
17/03/18 05:43:33 DEBUG clients.NetworkClient: Sending metadata request {topics=[KTO]} to node -1