Community Articles

Find and share helpful community-sourced technical articles.
avatar
Super Collaborator

This is a quick how-to on integrating spark on zeppelin with kerberised kafka. To do this first we will enable impersonation support in zeppelin for spark. By Impersonation, i mean jobs will be executed / submitted to spark by zeppelin as the user logged in to zeppelin and not the default zeppelin user. In my case i will be using my user "Karthik"

92921-screen-shot-2018-10-19-at-123431-am.png

Now to enable impersonation while submitting to spark, i would recommend using the livy server. The livy server and the interpreter natively support impersonation. Spark interpreter impersonation is possible, but you may have to configure SSh keys or sudo capability for zeppelin which may not be recommended. You can livy to hdp by going to a host and installing "Spark client" , followed by the livy server.

92923-screen-shot-2018-10-16-at-12607-pm.png

Restart Zeppelin , to enable the livy interpreter. Ensure that livy impersonation and livy proxy permissions are setup correctly in ambari. This is done when you install livy server by default.

92924-screen-shot-2018-10-19-at-122913-am.png

92925-screen-shot-2018-10-19-at-122936-am.png

Log in to zeppelin using your credentials.click the drop-down with your username on the top right and select interpreter option.

92926-screen-shot-2018-10-19-at-124220-am.png

on the interpreter menu, scroll down to the livy interpreter and change interpreter instantiation to be "per user". you can change process option to scoped / isolated based on your needs, either option works.

92927-screen-shot-2018-10-19-at-124337-am.png

At this point you write a test spark code and execute. check the yarn resourcemanger ui for the task, it should run under the logged in user, "karthik" in my case.

92928-screen-shot-2018-10-19-at-124549-am.png

I attempted this using spark streaming, to enable spark streaming to connect with kafka, you need to include the spark-streaming-kafka jar as a dependency for your spark job. Please add the below property and value to your livy interpreter. ensure the correct jar version based on kafka and spark versions.

92931-screen-shot-2018-10-19-at-10455-am.png

Now to enable the spark job to authenticate itself to a kerberised kafka. The trick is to specify the jaas config along with correct keytab to both the spark driver and executor. To do this i created a kafka-jass config with reference to my users keytab and put in /tmp/ in HDFS. I also put in the /tmp/ folder of all my data nodes. I also copied my keytab to all data nodes under /tmp

When passing the jaas conf or keytab to spark, i have two options

1. I can push them both to HDFS and refer to them in my livy spark options. The options are exactly the same as what you would pass to spark-submit, except you add "livy." in front of them. In this case since i copied the keytabs already to the datanode /tmp folders , i only had to pass the jaas config as a file to livy.

92929-screen-shot-2018-10-19-at-125111-am.png

I then passed the name of the file (the part after # ) to my spark driver and executer as extrajava options...

92930-screen-shot-2018-10-19-at-125300-am.png

This what my jaas-config looks like . Since, i had already copied my keytab to all compute nodes and set it with right permissions for me, i did not have to pass them in the config. I think it may be also better to do that with jaas-config so users can pass them from inside their code and not have to depend on a interpreter config. I will show how to do this later.

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
keyTab="/tmp/karthik.keytab"
storeKey=true
serviceName="kafka"
principal="karthik@HORTONWORKS.COM";
};

2. The second option is to simply pass the jaas config file location and envrionment variable using the System.setProperty() right inside the spark code. This will ensure that the user can use his own jaas conf and keytab without having to change interpreter config.

System.setProperty("java.security.auth.login.config","/tmp/kafka_jaas.conf")

Run, the spark job and you should be able to connect to the kerberised kafka broker and access topics. if it fails for some reason, check you have uploaded the jaas-config to right location in hdfs, your keytabs have proper permission and your user has permissions to the topic in ranger or acls.

4,087 Views
0 Kudos
Comments
avatar

Excellent work, thank you!

avatar
New Contributor

Hi,
thanks a lot for this blog. However I didn't manage to make it work, and I have a couple of questions:

1. I'm not using the Livy interpreter, but the Spark one. We have everything set up with it, so I'm wondering if this method can also be used there. For instance, I tried running the Livy interpreter, but I can't make it work (this cluster was originally set up by other people and nobody ever used the Livy interpreter, I think)
2. If it's equivalent with the spark interpreter, I just can't make it work. I'm not sure if I got it properly but this is what I did:
-- a) create the jaas file similar to what you show. Copy it to all nodes with right permissions. I even used the /tmp fodler as you do, just in case.

-- b) copy it also to hdfs in the 'same' path with right permissions. Is this correct?

-- c) copy the keytabs to all the nodes, in the same path,  with right permissions.

-- d) do not copy the keytab to hdfs

-- c) call System.setProperty before the sc is created in the notebook.

 

Does this make sense? It just doesn't work for me. I get this type of error and unfortunately I can't see all of it.

 

Py4JJavaError: An error occurred while calling o118.load.
: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.SecurityException: java.io.IOException: Configuration Error:
No such file or directory
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:172)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:156)
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:142)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
... 19 more
Caused by: java.io.IOException: Configuration Error:
No such file or directory
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:335)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:271)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
... 36 more

 

Thanks and best regards,
Valerio

avatar
Super Collaborator

@sajidiqubal CAn you share more info. The solution is simple, the spart streaming jobs needs to find the kafka-jaas and the corresponding keytab. Make sure both paths are accessible on all machines. So kafka-jaas and the keytab need to be in the local folder and not hdfs. If you need in hdfs, then it needs to be sent it as a part of the spark --files and --keytab arguments ( iirc). In newer versions of kafka, you can add the jaas info as a kafka parameter using sasl.jaas.config. see ex. below. In that case you just need the keytab to be available on all machines.

 

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true \
    keyTab="/etc/security/keytabs/kafka_client.keytab" \
    principal="kafkaclient1@EXAMPLE.COM";

you will also need a parameter 

sasl.kerberos.service.name=kafka

From your error it looks like the code is not able to find one of the

files, the jaas.conf or the keytab. Please check and make sure the file is in the right path and on all yarn nodes.