Created 08-12-2021 04:37 AM
我在一个启用了kerberos的CDH6.3集群里,提交一个flink1.9版本程序到yarn,需消费集群外一个0.8版本的未启用安全认证的kafka,需要如何设置他不使用安全的方式去连接这个集群外kafka的zookeeper呢?谢谢。
Created 08-19-2021 05:45 AM
Mindset
Kafka is a popular message bus, which has many applications producing into it or consuming from it.
Though Flink is not limited to Kafka, the primary source and sink is Kafka.
Therefore, when looking for Flink-Kafka information, the best place is probably with your Flink supplier.
Practical
Though I do not fully understand the described security limitations, Cloudera has two ways to interact with flink.
1. With the SQL Streambuilder: A Graphical Interface to write SQL which automatically gets converted into a Flink job. When using the SQL streambuilder you only need to connect a cluster once, and then all your jobs can use it.
2. Directly with Flink: There are many examples available on connecting with Kafka, for example: https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html
Further information
If this does not help, first of all make sure to use the right version of the software (It is always recommended to use the Cloudera distribution, which makes integration easier).
If you are already using the Cloudera distribution of Flink and need further assistance, please contact the Cloudera Support organization, as they can help you with more detailed information.
Created on 08-12-2021 05:13 AM - edited 08-12-2021 05:14 AM
English Translation of the issue:
TITLE: How does the flink program of a kerberos-enabled cluster connect to Kafka that is not authenticated outside the cluster?
I am in a CDH6.3 cluster with kerberos enabled, and I submit a flink1.9 version program to yarn. I need to consume a 0.8 version of Kafka without security authentication outside the cluster. How to set it not to use a secure way to connect to this What about the zookeeper of Kafka outside the cluster? thanks.
Regards,
Vidya Sargur,Created 08-19-2021 05:45 AM
Mindset
Kafka is a popular message bus, which has many applications producing into it or consuming from it.
Though Flink is not limited to Kafka, the primary source and sink is Kafka.
Therefore, when looking for Flink-Kafka information, the best place is probably with your Flink supplier.
Practical
Though I do not fully understand the described security limitations, Cloudera has two ways to interact with flink.
1. With the SQL Streambuilder: A Graphical Interface to write SQL which automatically gets converted into a Flink job. When using the SQL streambuilder you only need to connect a cluster once, and then all your jobs can use it.
2. Directly with Flink: There are many examples available on connecting with Kafka, for example: https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html
Further information
If this does not help, first of all make sure to use the right version of the software (It is always recommended to use the Cloudera distribution, which makes integration easier).
If you are already using the Cloudera distribution of Flink and need further assistance, please contact the Cloudera Support organization, as they can help you with more detailed information.
Created 08-22-2021 10:50 PM
@blueb, Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
Regards,
Vidya Sargur,Created 08-22-2021 10:56 PM
flink配置文件中默认配置了security.kerberos.login.contexts: Client,KafkaClient,取消对应的配置就可以正常执行了。
Created on 08-22-2021 11:46 PM - edited 08-22-2021 11:57 PM
Thanks, @blueb, thanks for the update. Posting the English translation here:
Security.kerberos.login.contexts: Client, KafkaClient are configured by default in the flink configuration file, and the corresponding configuration can be canceled to execute normally.
Regards,
Vidya Sargur,Created 08-23-2021 12:51 AM
Hi @blueb
感谢您提供的宝贵的反馈!
基本上 Flink 连接 Kafka 也是遵照常规 Java 项目使用 Kafka 的模式,您可以参考此链接了解常规 Java client 与 Kafka 连接时的主要选项。
您的非 CM 管理的 Kafka 集群若是未启用认证的话,应该属于"Unsecured"。
我在 Cloudera 的官方 Github 上找到了一个 Flink ↔ Kafka 的 demo 项目,您可以参考其中的job.properties。
另外这个项目还有连接 secure Kafka 的 demo 项目,其中有配置连接 Kafka 的部分。
您可以看到 job.properties 文件中定义了:
kafka.security.protocol=SASL_SSL
这个 SASL_SSL 的含义是: 使用 SASL/PLAIN (CDP 中的 Kafka 开启 Kerberos 认证参考此链接) 作为认证方式,并使用 SSL/TLS 作为数据传输方式(也就是除了配置了认证之外,还在CM UI中Enable TLS/SSL for Kafka Broker)。参考: Confluent 官方文档。
如果传输方式没有Enable TLS/SSL,那么 Kafka Broker 的日志 (/var/log/kafka/server.log) 中,您会看到 listeners = SASL_PLAINTEXT;如果开启了Kerberos 认证 (或LDAP、PAM等其他SASL认证) 又Enable TLS/SSL for Kafka Broker,那么您会看到 listeners = SASL_SSL。
另外,值得注意的是,您可以同时配置多个listener,也就是listeners = SASL_PLAINTEXT 和 listeners = SASL_SSL 可以同时存在。
另外此 demo 代码也有一个 YouTube 视频演示。
以上信息供您参考。