Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

Most of online article already specified the code and submission command for Spark connecting Kafka in kerberos environment. Here we talk other issues you probably encountered when doing debugging, which are of the kind of easily ignored. I experienced and be tortured, but no pain no gain. Here records them and hope do help to you.

The environment is HDP 2.6.1.0 with FreeIPA kerberized, Spark 1.6.3 and Kafka 0.10.1 The test code I borrowed from below link, but added lines enabling it running in kerberized.

https://github.com/eBay/Spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming...

package spark.example;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.regex.Pattern;
import scala.Tuple2;
import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
public final class JavaDirectKafkaWordCount {
    private static final Pattern SPACE = Pattern.compile( " " );
    public static void main(String[] args) {
        // Create context with a 2 seconds batch interval
        SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, Durations.seconds( 200 ) );
        HashSet<String> topicsSet = new HashSet<String>( Arrays.asList( "chen" ) );
        HashMap<String, String> kafkaParams = new HashMap<String,String>();
        kafkaParams.put("bootstrap.servers", "cheny0.field.hortonworks.com:6667");
        kafkaParams.put("group.id", "test_sparkstreaming_kafka");
        kafkaParams.put("auto.offset.reset", "largest");
        kafkaParams.put("security.protocol", "PLAINTEXTSASL");
        System.setProperty("java.security.auth.login.config","/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );
        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map( new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        } );
        JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String x) {
                return Lists.newArrayList( SPACE.split( x ) );
            }
        } );
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>( s, 1 );
                    }
                } ).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                } );
        wordCounts.print();
        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
} 

The jaas configuration file wrap principal and its keytab path, here I'm lazy directly use kafka service account. Should build user account for use in this.

[root@cheny0 ~]# cat /usr/hdp/current/kafka-broker/config/kafka_client_jaas.confKafkaClient
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
principal="kafka/cheny0.field.hortonworks.com@FIELD.HORTONWORKS.COM"
keyTab="kafka.service.keytab"
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
principal="kafka/cheny0.field.hortonworks.com@FIELD.HORTONWORKS.COM"
keyTab="kafka.service.keytab"
useTicketCache=true
renewTicket=true
serviceName="kafka";
}; 

And here need say more on submission command. In the convenience of you to check log in spark history server, if in HDP 2.5 and prior version, you have to specify the server address in the command:

//spark job for kerberized hdp 2.5
spark-submit --master=yarn --deploy-mode=cluster \
--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.yarn.historyServer.address=http://cheny0.field.hortonworks.com:18080"  \
--conf "spark.eventLog.dir=hdfs:///spark-history"  \
--conf "spark.eventLog.enabled=true" \
--jars /usr/hdp/current/spark-client/lib/spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar,/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.6.1.0-129.jar,/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar \
--class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar
There is no need such in HDP 2.6
//spark job for kerberized hdp 2.6
spark-submit --master=yarn --deploy-mode=cluster \
--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--jars /usr/hdp/current/spark-client/lib/spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar,/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.6.1.0-129.jar,/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar \
--class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar 

For those of you who expected explanation on above code and command, can be referred to document link https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.3/bk_spark-guide/content/spark-streaming-kafk...

Now finish background introduce, let's step back to issue topics, I will walk you through

Zookeeper access right check

Because Kafka need to read topics and offset information from zookeeper znode, check if the acl satisfies

[root@cheny0 tmp]# cd /usr/hdp/current/zookeeper-client/bin
[root@cheny0 bin]# ./zkCli.sh -server cheny0:2181
[zk: cheny0:2181(CONNECTED) 0] getAcl /
'world,'anyone
: cdrwa
[zk: cheny0:2181(CONNECTED) 2] getAcl /consumers
'world,'anyone
: cdrwa
[zk: cheny0:2181(CONNECTED) 4] getAcl /brokers   
'world,'anyone
: cdrwa 

HDFS access right check

[root@cheny0 ~]# hdfs dfs -ls /user
Found 10 items
drwxrwx--- - ambari-qa hdfs0 2017-09-17 13:34 /user/ambari-qa
drwxr-xr-x - hbase hdfs0 2017-08-20 00:08 /user/hbased
rwxr-xr-x - hcathdfs0 2017-08-06 00:55 /user/hcat
drwx------ - hdfshdfs0 2017-09-28 03:10 /user/hdfs
drwxr-xr-x - hivehdfs0 2017-08-06 00:55 /user/hive
drwxr-xr-x - kafka hdfs0 2017-09-28 12:23 /user/kafka
drwxrwxr-x - oozie hdfs0 2017-08-06 00:56 /user/oozie
drwxr-xr-x - hdfshdfs0 2017-08-13 11:47 /user/root
drwxrwxr-x - spark hdfs0 2017-09-08 16:54 /user/spark
drwxr-xr-x - yarnhdfs0 2017-09-16 22:17 /user/yarn 

Note by default there did not create path "/user/kafka", so you need do it by hand and give right access

[root@cheny0 ~]# hdfs dfs -mkdir /user/kafka
[root@cheny0 ~]# hdfs dfs -chown kafka:hdfs /user/kafka 

Jaas configuration file and Keytab file access right

Sparkstreaming job here is submitted in yarn cluster, which then plays as yarn user to read jaas and keytab files, should check the file acl. By default keytab file only its principal user to read, other users cannot access. You should grant other user to read those two files.

[root@cheny0 ~]# ls -l /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf
-rw-r--r-- 1 kafka hadoop 560 Sep 26 13:35 /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf
[root@cheny0 ~]# ls -l /etc/security/keytabs/kafka.service.keytab
-r--r--r-- 1 kafka hadoop 208 Aug6 01:53 /etc/security/keytabs/kafka.service.keytab 

Specify path of Jaas and Keytab files

As emphasized in most Spark articles, Sparkstreaming driver wrap Jaas and Keytab files and sent to executor to parse. The very important step is specify the path on the driver machine. There is no need to hand-copy those files to all slave nodes.

--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \ 

If not sufficient paying attention to above points, you will likely come across error saying

org.apache.spark.SparkException: Couldn't connect to leader for topic 

or

javax.security.auth.login.LoginException:Unable to obtain password from user 

or

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:Jaas configuration not found 

If all go well, we move to some points need to note when debugging Use Yarn to kill application The test code is non-stop because its of streaming process. Whenever submit and failed but still be running on yarn queue resource. If not to clear, successive submission will be block and queued. Better use yarn command to kill them.

[root@cheny0 tmp]# yarn application -list -appStates RUNNING
17/09/30 22:22:08 INFO client.RMProxy: Connecting to ResourceManager at cheny1.field.hortonworks.com/172.26.197.243:8050
17/09/30 22:22:09 INFO client.AHSProxy: Connecting to Application History server at cheny1.field.hortonworks.com/172.26.197.243:10200
Total number of applications (application-types: [] and states: [RUNNING]):3
                Application-Id    Application-Name    Application-Type      User     Queue             State       Final-State       Progress                       Tracking-URL
application_1505572583831_0056spark.example.JavaDirectKafkaWordCount               SPARK     kafka   default           RUNNING         UNDEFINED            10%        http://172.26.197.246:37394
application_1505572583831_0057spark.example.JavaDirectKafkaWordCount               SPARK     kafka   default           RUNNING         UNDEFINED            10%        http://172.26.197.246:35247
application_1505572583831_0058spark.example.JavaDirectKafkaWordCount               SPARK     kafka   default           RUNNING         UNDEFINED            10%        http://172.26.197.246:42456
[root@cheny0 tmp]# yarn application -kill application_1505572583831_0056
17/09/30 22:22:33 INFO client.RMProxy: Connecting to ResourceManager at cheny1.field.hortonworks.com/172.26.197.243:8050
17/09/30 22:22:34 INFO client.AHSProxy: Connecting to Application History server at cheny1.field.hortonworks.com/172.26.197.243:10200
Killing application application_1505572583831_0056
17/09/30 22:22:34 INFO impl.YarnClientImpl: Killed application application_1505572583831_0056 

Use Yarn resource manager and History server webpage to check log

Yarn resource manager helps you dive into containers' logs since spark execute tasks distributed across contained executors. While History server enable you to check spark DAG and full stack printed on each executor, and the more is collecting metrics help to do problem analysis.

Maven: specify HDP repository and dependent jar for compiling purpose

Should align with HDP release package and have jar files searched from HDP official repository

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <id>hortonworks.extrepo</id>
        <name>Hortonworks HDP</name>
        <url>http://repo.hortonworks.com/content/repositories/releases</url>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <id>hortonworks.other</id>
        <name>Hortonworks Other Dependencies</name>
        <url>http://repo.hortonworks.com/content/groups/public</url>
    </repository>
</repositories><br>

For this test code, we have following dependent package

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3.2.6.1.0-129</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.3.2.6.1.0-129</version>
        <scope>provided</scope>
    </dependency>
</dependencies><br>

Once compiled the target jar, only pick non-dependencies to execute, and specify path of dependent jars in command.

--jars /usr/hdp/current/spark-client/lib/spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar,/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.6.1.0-129.jar,/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar \
--class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar

Use Kafka tool to test

After starting the above streaming process, meanwhile initiate an producer using

[root@cheny0 bin]# ./kafka-console-producer.sh --broker-list cheny0:6667 --topic chen --security-protocol PLAINTEXTSASL
Other Notable Pits for Sparkstreaming Connecting Kafka in Kerberized HDP 2.6

Here typed the title of this article as input. And checked output log generated from SparkStreaming process, either from yarn or spark history server.

-------------------------------------------
Time: 1506791000000 ms
-------------------------------------------

-------------------------------------------
Time: 1506791200000 ms
-------------------------------------------
(Kafka,1)
(2.6,1)
(HDP,1)
(Other,1)
(,1)
(Connecting,1)
(Kerberized,1)
(Notable,1)
(Issue,1)
(in,1)
...

-------------------------------------------
Time: 1506791400000 ms
-------------------------------------------

When you witness the above result, that means success through. Last but not least, for debugging spark stream accessing kafka in kerberized environment, you may encounter many issues from all aspect unexpected. If you are not much sure about your code, better to split into parts and verify one by one, do not scramble it and make you lost. That is my summary from experienced tortures. Hope you guys will do smoother than me. Stay awesome!

5,963 Views
Comments

Have you got some experience on deploying such a spark job on oozie? I tired many times but always failed to make spark read jaas file on oozie. Running by command line is just fine for everything.

hi @gabriele ran have you managed to make spark read the jaas file while using ooozie ?


Same issue we are facing in our Pyspark streaming .. Could you please let me know is it possible to handle in python mode spark as well ? 

https://stackoverflow.com/questions/58755063/failed-to-find-leader-for-topics-java-lang-nullpointere...