Most of online article already specified the code and submission command for Spark connecting Kafka in kerberos environment. Here we talk other issues you probably encountered when doing debugging, which are of the kind of easily ignored. I experienced and be tortured, but no pain no gain. Here records them and hope do help to you.
The environment is HDP with FreeIPA kerberized, Spark 1.6.3 and Kafka 0.10.1 The test code I borrowed from below link, but added lines enabling it running in kerberized.
package spark.example; import java.util.HashMap; import java.util.HashSet; import java.util.Arrays; import java.util.regex.Pattern; import scala.Tuple2; import; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import*; import*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.Durations; public final class JavaDirectKafkaWordCount { private static final Pattern SPACE = Pattern.compile( " " ); public static void main(String[] args) { // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, Durations.seconds( 200 ) ); HashSet<String> topicsSet = new HashSet<String>( Arrays.asList( "chen" ) ); HashMap<String, String> kafkaParams = new HashMap<String,String>(); kafkaParams.put("bootstrap.servers", ""); kafkaParams.put("", "test_sparkstreaming_kafka"); kafkaParams.put("auto.offset.reset", "largest"); kafkaParams.put("security.protocol", "PLAINTEXTSASL"); System.setProperty("","/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf"); System.setProperty("", "/etc/krb5.conf"); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // Get the lines, split them into words, count the words and print JavaDStream<String> lines = new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } } ); JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList( SPACE.split( x ) ); } } ); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>( s, 1 ); } } ).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); wordCounts.print(); // Start the computation jssc.start(); jssc.awaitTermination(); } }
The jaas configuration file wrap principal and its keytab path, here I'm lazy directly use kafka service account. Should build user account for use in this.
[root@cheny0 ~]# cat /usr/hdp/current/kafka-broker/config/kafka_client_jaas.confKafkaClient KafkaClient { required useKeyTab=true doNotPrompt=true principal="kafka/" keyTab="kafka.service.keytab" useTicketCache=true renewTicket=true serviceName="kafka"; }; Client { required useKeyTab=true doNotPrompt=true principal="kafka/" keyTab="kafka.service.keytab" useTicketCache=true renewTicket=true serviceName="kafka"; };
And here need say more on submission command. In the convenience of you to check log in spark history server, if in HDP 2.5 and prior version, you have to specify the server address in the command:
//spark job for kerberized hdp 2.5 spark-submit --master=yarn --deploy-mode=cluster \ --files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \ --conf "" \ --conf "" \ --conf "spark.yarn.historyServer.address=" \ --conf "spark.eventLog.dir=hdfs:///spark-history" \ --conf "spark.eventLog.enabled=true" \ --jars /usr/hdp/current/spark-client/lib/spark-assembly-,/usr/hdp/current/kafka-broker/libs/kafka_2.10-,/usr/hdp/current/spark-client/lib/spark-examples- \ --class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar There is no need such in HDP 2.6 //spark job for kerberized hdp 2.6 spark-submit --master=yarn --deploy-mode=cluster \ --files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \ --conf "" \ --conf "" \ --jars /usr/hdp/current/spark-client/lib/spark-assembly-,/usr/hdp/current/kafka-broker/libs/kafka_2.10-,/usr/hdp/current/spark-client/lib/spark-examples- \ --class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar
For those of you who expected explanation on above code and command, can be referred to document link
Now finish background introduce, let's step back to issue topics, I will walk you through
Zookeeper access right check
Because Kafka need to read topics and offset information from zookeeper znode, check if the acl satisfies
[root@cheny0 tmp]# cd /usr/hdp/current/zookeeper-client/bin [root@cheny0 bin]# ./ -server cheny0:2181 [zk: cheny0:2181(CONNECTED) 0] getAcl / 'world,'anyone : cdrwa [zk: cheny0:2181(CONNECTED) 2] getAcl /consumers 'world,'anyone : cdrwa [zk: cheny0:2181(CONNECTED) 4] getAcl /brokers 'world,'anyone : cdrwa
HDFS access right check
[root@cheny0 ~]# hdfs dfs -ls /user Found 10 items drwxrwx--- - ambari-qa hdfs0 2017-09-17 13:34 /user/ambari-qa drwxr-xr-x - hbase hdfs0 2017-08-20 00:08 /user/hbased rwxr-xr-x - hcathdfs0 2017-08-06 00:55 /user/hcat drwx------ - hdfshdfs0 2017-09-28 03:10 /user/hdfs drwxr-xr-x - hivehdfs0 2017-08-06 00:55 /user/hive drwxr-xr-x - kafka hdfs0 2017-09-28 12:23 /user/kafka drwxrwxr-x - oozie hdfs0 2017-08-06 00:56 /user/oozie drwxr-xr-x - hdfshdfs0 2017-08-13 11:47 /user/root drwxrwxr-x - spark hdfs0 2017-09-08 16:54 /user/spark drwxr-xr-x - yarnhdfs0 2017-09-16 22:17 /user/yarn
Note by default there did not create path "/user/kafka", so you need do it by hand and give right access
[root@cheny0 ~]# hdfs dfs -mkdir /user/kafka [root@cheny0 ~]# hdfs dfs -chown kafka:hdfs /user/kafka
Jaas configuration file and Keytab file access right
Sparkstreaming job here is submitted in yarn cluster, which then plays as yarn user to read jaas and keytab files, should check the file acl. By default keytab file only its principal user to read, other users cannot access. You should grant other user to read those two files.
[root@cheny0 ~]# ls -l /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -rw-r--r-- 1 kafka hadoop 560 Sep 26 13:35 /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf [root@cheny0 ~]# ls -l /etc/security/keytabs/kafka.service.keytab -r--r--r-- 1 kafka hadoop 208 Aug6 01:53 /etc/security/keytabs/kafka.service.keytab
Specify path of Jaas and Keytab files
As emphasized in most Spark articles, Sparkstreaming driver wrap Jaas and Keytab files and sent to executor to parse. The very important step is specify the path on the driver machine. There is no need to hand-copy those files to all slave nodes.
--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \ --conf "" \ --conf "" \
If not sufficient paying attention to above points, you will likely come across error saying
org.apache.spark.SparkException: Couldn't connect to leader for topic
or to obtain password from user
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:Jaas configuration not found
If all go well, we move to some points need to note when debugging Use Yarn to kill application The test code is non-stop because its of streaming process. Whenever submit and failed but still be running on yarn queue resource. If not to clear, successive submission will be block and queued. Better use yarn command to kill them.
[root@cheny0 tmp]# yarn application -list -appStates RUNNING 17/09/30 22:22:08 INFO client.RMProxy: Connecting to ResourceManager at 17/09/30 22:22:09 INFO client.AHSProxy: Connecting to Application History server at Total number of applications (application-types: [] and states: [RUNNING]):3 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1505572583831_0056spark.example.JavaDirectKafkaWordCount SPARK kafka default RUNNING UNDEFINED 10% application_1505572583831_0057spark.example.JavaDirectKafkaWordCount SPARK kafka default RUNNING UNDEFINED 10% application_1505572583831_0058spark.example.JavaDirectKafkaWordCount SPARK kafka default RUNNING UNDEFINED 10% [root@cheny0 tmp]# yarn application -kill application_1505572583831_0056 17/09/30 22:22:33 INFO client.RMProxy: Connecting to ResourceManager at 17/09/30 22:22:34 INFO client.AHSProxy: Connecting to Application History server at Killing application application_1505572583831_0056 17/09/30 22:22:34 INFO impl.YarnClientImpl: Killed application application_1505572583831_0056
Use Yarn resource manager and History server webpage to check log
Yarn resource manager helps you dive into containers' logs since spark execute tasks distributed across contained executors. While History server enable you to check spark DAG and full stack printed on each executor, and the more is collecting metrics help to do problem analysis.
Maven: specify HDP repository and dependent jar for compiling purpose
Should align with HDP release package and have jar files searched from HDP official repository
<repositories> <repository> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> <id>hortonworks.extrepo</id> <name>Hortonworks HDP</name> <url></url> </repository> <repository> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> <id>hortonworks.other</id> <name>Hortonworks Other Dependencies</name> <url></url> </repository> </repositories><br>
For this test code, we have following dependent package
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version></version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version></version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version></version> <scope>provided</scope> </dependency> </dependencies><br>
Once compiled the target jar, only pick non-dependencies to execute, and specify path of dependent jars in command.
--jars /usr/hdp/current/spark-client/lib/spark-assembly-,/usr/hdp/current/kafka-broker/libs/kafka_2.10-,/usr/hdp/current/spark-client/lib/spark-examples- \ --class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar
Use Kafka tool to test
After starting the above streaming process, meanwhile initiate an producer using
[root@cheny0 bin]# ./ --broker-list cheny0:6667 --topic chen --security-protocol PLAINTEXTSASL Other Notable Pits for Sparkstreaming Connecting Kafka in Kerberized HDP 2.6
Here typed the title of this article as input. And checked output log generated from SparkStreaming process, either from yarn or spark history server.
------------------------------------------- Time: 1506791000000 ms ------------------------------------------- ------------------------------------------- Time: 1506791200000 ms ------------------------------------------- (Kafka,1) (2.6,1) (HDP,1) (Other,1) (,1) (Connecting,1) (Kerberized,1) (Notable,1) (Issue,1) (in,1) ... ------------------------------------------- Time: 1506791400000 ms -------------------------------------------
When you witness the above result, that means success through. Last but not least, for debugging spark stream accessing kafka in kerberized environment, you may encounter many issues from all aspect unexpected. If you are not much sure about your code, better to split into parts and verify one by one, do not scramble it and make you lost. That is my summary from experienced tortures. Hope you guys will do smoother than me. Stay awesome!
