Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Can't start Kafka Connect HDP 3.0.1

avatar
New Contributor

Hi all!,I have to configure Kafka connect to use a mysql connector (Debezium), but when i run the command:

 

 bin/connect-distributed.sh config/connect-distributed.properties

 

it fails after sometimes with the following error:

 

INFO Kafka version : 1.1.1.3.0.1.0-187 (org.apache.kafka.common.utils.AppInfoParser:109)
[2020-10-15 10:22:48,074] INFO Kafka commitId : bbbf85928afedf80 (org.apache.kafka.common.utils.AppInfoParser:110)
[2020-10-15 10:24:48,085] WARN [Principal=kafka/sandbox-hdp.hortonworks.com@ADVISE.IT]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin:192)
[2020-10-15 10:24:48,091] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:112)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
        ... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

 

In the cluster kerberos is enabled. I share to you also the connect-distributed.properties.

 

 

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.

bootstrap.servers=sandbox-hdp.hortonworks.com:6667

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs

group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka

heartbeat.interval.ms=3000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to

key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets, config, and status data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset, config, and status data is never visible outside of Kafka Connect in this format.

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.

offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.

config.storage.topic=connect-configs
config.storage.replication.factor=2

rebalance.timeout.ms=60000
schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
   useKeyTab=true \
   storeKey=true \
   keyTab="/etc/security/keytabs/kafka.service.keytab" \
   principal="kafka/sandbox-hdp.hortonworks.com@ADVISE.IT";
producer.sasl.mechanism=GSSAPI
producer.sasl.kerberos.service.name=kafka
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
   useKeyTab=true \
   storeKey=true \
   keyTab="/etc/security/keytabs/kafka.service.keytab" \
   principal="kafka/sandbox-hdp.hortonworks.com@ADVISE.IT";
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
   useKeyTab=true \
   storeKey=true \
   keyTab="/etc/security/keytabs/kafka.service.keytab" \
   principal="kafka/sandbox-hdp.hortonworks.com@ADVISE.IT";

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.

status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

# These are provided to inform the user about the presence of the REST host and port configs 
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.

rest.host.name=sandbox-hdp.hortonworks.com
rest.port=3000

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.

rest.advertised.host.name=sandbox-hdp.hortonworks.com
rest.advertised.port=3000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

#plugin.path=/etc/kafka/conf/connector

 

 

 
 
2 REPLIES 2

avatar
Master Mentor

@bhoken 

In a kerberized cluster the Kafka ACL is leveraged by Ranger,if the Kafka plugin is enabled don't look further than the Ranger 

Please share you Ranger-Kafka policy 

avatar
New Contributor

Hi and thank you for your reply! Reading the logs of kafka i found that the controller was null, due to a enabling-desabling kerberos error. After cancelling the znode and restart kafka, i am able now to start kafka connect correctly. Also i configured debezium plugin and i have all the data of the mysql server in kafka topic. However i am not able to configure correctly the hdfssink connector to convert kafka topic in hive table. Can you please help me? @Shelton