Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

在启用kerberos的集群flink程序如何连接集群外未启用认证的kafka

avatar
New Contributor

我在一个启用了kerberos的CDH6.3集群里,提交一个flink1.9版本程序到yarn,需消费集群外一个0.8版本的未启用安全认证的kafka,需要如何设置他不使用安全的方式去连接这个集群外kafka的zookeeper呢?谢谢。

 

org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient - Client will use GSSAPI as SASL mechanism.
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server ark1.analysys.xyz/10.2.33.189:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient - An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client will go to AUTH_FAILED state.
1 ACCEPTED SOLUTION

avatar

Mindset

Kafka is a popular message bus, which has many applications producing into it or consuming from it.

 

Though Flink is not limited to Kafka, the primary source and sink is Kafka.

 

Therefore, when looking for Flink-Kafka information, the best place is probably with your Flink supplier.

 

Practical

Though I do not fully understand the described security limitations, Cloudera has two ways to interact with flink.

1. With the SQL Streambuilder: A Graphical Interface to write SQL which automatically gets converted into a Flink job. When using the SQL streambuilder you only need to connect a cluster once, and then all your jobs can use it.

2. Directly with Flink: There are many examples available on connecting with Kafka, for example: https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html

 

Further information

 

If this does not help, first of all make sure to use the right version of the software (It is always recommended to use the Cloudera distribution, which makes integration easier).

If you are already using the Cloudera distribution of Flink and need further assistance, please contact the Cloudera Support organization, as they can help you with more detailed information.

 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

View solution in original post

6 REPLIES 6

avatar
Community Manager

English Translation of the issue: 

 

TITLE: How does the flink program of a kerberos-enabled cluster connect to Kafka that is not authenticated outside the cluster?

 

I am in a CDH6.3 cluster with kerberos enabled, and I submit a flink1.9 version program to yarn. I need to consume a 0.8 version of Kafka without security authentication outside the cluster. How to set it not to use a secure way to connect to this What about the zookeeper of Kafka outside the cluster? thanks.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

Mindset

Kafka is a popular message bus, which has many applications producing into it or consuming from it.

 

Though Flink is not limited to Kafka, the primary source and sink is Kafka.

 

Therefore, when looking for Flink-Kafka information, the best place is probably with your Flink supplier.

 

Practical

Though I do not fully understand the described security limitations, Cloudera has two ways to interact with flink.

1. With the SQL Streambuilder: A Graphical Interface to write SQL which automatically gets converted into a Flink job. When using the SQL streambuilder you only need to connect a cluster once, and then all your jobs can use it.

2. Directly with Flink: There are many examples available on connecting with Kafka, for example: https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html

 

Further information

 

If this does not help, first of all make sure to use the right version of the software (It is always recommended to use the Cloudera distribution, which makes integration easier).

If you are already using the Cloudera distribution of Flink and need further assistance, please contact the Cloudera Support organization, as they can help you with more detailed information.

 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

avatar
Community Manager

@blueb, Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
New Contributor

flink配置文件中默认配置了security.kerberos.login.contexts: Client,KafkaClient,取消对应的配置就可以正常执行了。

avatar
Community Manager

Thanks, @blueb, thanks for the update. Posting the English translation here: 

Security.kerberos.login.contexts: Client, KafkaClient are configured by default in the flink configuration file, and the corresponding configuration can be canceled to execute normally.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Contributor

Hi @blueb 

感谢您提供的宝贵的反馈!

 

基本上 Flink 连接 Kafka 也是遵照常规 Java 项目使用 Kafka 的模式,您可以参考此链接了解常规 Java client 与 Kafka 连接时的主要选项。

 

您的非 CM 管理的 Kafka 集群若是未启用认证的话,应该属于"Unsecured"。

 

我在 Cloudera 的官方 Github 上找到了一个 Flink ↔ Kafka 的 demo 项目,您可以参考其中的job.properties

另外这个项目还有连接 secure Kafka 的 demo 项目,其中有配置连接 Kafka 的部分。

 

您可以看到 job.properties 文件中定义了: 

kafka.security.protocol=SASL_SSL

 

这个 SASL_SSL 的含义是: 使用 SASL/PLAIN (CDP 中的 Kafka 开启 Kerberos 认证参考此链接) 作为认证方式,并使用 SSL/TLS 作为数据传输方式(也就是除了配置了认证之外,还在CM UI中Enable TLS/SSL for Kafka Broker)。参考: Confluent 官方文档。 

如果传输方式没有Enable TLS/SSL,那么 Kafka Broker 的日志 (/var/log/kafka/server.log) 中,您会看到 listeners = SASL_PLAINTEXT;如果开启了Kerberos 认证 (或LDAP、PAM等其他SASL认证) 又Enable TLS/SSL for Kafka Broker,那么您会看到 listeners = SASL_SSL

另外,值得注意的是,您可以同时配置多个listener,也就是listeners = SASL_PLAINTEXT 和 listeners = SASL_SSL 可以同时存在。

 

另外此 demo 代码也有一个 YouTube 视频演示。 

 

以上信息供您参考。