Member since
01-11-2016
34
Posts
8
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
6222 | 06-21-2017 06:09 AM | |
36807 | 06-16-2017 03:52 PM | |
22014 | 12-12-2016 02:30 PM | |
3937 | 09-01-2016 08:40 AM | |
3244 | 05-09-2016 05:01 PM |
11-25-2017
01:25 AM
2 Kudos
OK, the problem is that for some reason spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar is not loaded and lookupDataSource do not see KafkaSourceProvider as extended class of trait DataSourceRegister, so there is no override def shortName(): String = "kafka" and when not found it sets default data source appending to given datasource provider, and that is why I got class not found kafka.DefaultSource. The solution is to 1. get from somewhere this jar spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar a) manually download from Cloudera repo https://repository.cloudera.com/cloudera/cloudera-repos/org/apache/spark/spark-sql-kafka-0-10_2.11/2.1.0.cloudera1/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar or b) run spark2-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 as Jacek Laskowski explained: https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-KafkaSource.adoc and 2. Add run your app via spark2-submit --jars ~/.ivy2/cache/org.apache.spark/spark-sql-kafka-0-10_2.11/jars/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar Very important: remember to set proper Kafka version (in this case 0.10) via export or in Spark2 service configuration in Cloudera Manager. https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html
... View more
11-23-2017
08:43 AM
Hi
I use CDH 5.10.1, Spark 2.1 and Kafka 2.1
When I try simple program for ETL:
val mystream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "mybroker:9092") .option("subscribe", "mytopic") .load()
With dependencies in build.sbt:
scalaVersion := "2.11.8" lazy val spark = Seq ( "spark-core", "spark-hive", "spark-streaming", "spark-sql", "spark-streaming-kafka-0-10" ).map( "org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided") libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.1.0.cloudera1" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0-kafka-2.1.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "0.10.0-kafka-2.1.0" libraryDependencies ++= spark resolvers ++= Seq( "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" )
When submitting my app, I get error:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:594)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:197)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at myetl.Main$.main(Main.scala:20)
at myetl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:579)
... 18 more
Please help...
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Kafka
-
Apache Spark
08-11-2017
03:16 AM
So, summarizing: When starting Spark with --deploy-mode client one must distribute Jaas file and keytab to every machine. When starting Spark with --deploy-mode cluster one must distribute Jaas file and keytab with --files and/or --keytab.
... View more
06-21-2017
06:09 AM
Well as far as I can read the code I've cited, there is problem when Kafka wants to list roles, and it want to do this when caching Sentry privileges is enabled. When I: 1. In Kafka configuration disabled sentry.kafka.caching.enable 2. In Sentry configuration deleted kafka group from sentry.service.admin.group, so the configuration is: root@node1:~# cd `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | head -n1`
root@node1:/var/run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER# grep -A 1 -E "sentry.service.(allow.connect|admin.group)" sentry-site.xml
<name>sentry.service.admin.group</name>
<value>hive,impala,hue,sudo</value>
--
<name>sentry.service.allow.connect</name>
<value>hive,impala,hue,hdfs,solr,kafka</value>
root@node1:/var/run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER# 3. Depoly client configuration and restart dependant services After this steps Kafka started properly, so turning off caching of Sentry privileges was a workaround for me to start Kafka without errors. Though I have problems when using kafka-sentry tool: root@node1:~# kinit isegrim
Password for isegrim@TEST.COM:
root@node1:~# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: isegrim@TEST.COM
Valid starting Expires Service principal
21/06/2017 15:01 22/06/2017 01:01 krbtgt/TEST.COM@TEST.COM
renew until 28/06/2017 15:01
root@node1:~# kafka-sentry --config `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | head -n1` -lp -r myrole
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-06-21 15:02:16,992] ERROR Config key sentry.service.client.server.rpc-address is required (org.apache.sentry.provider.db.generic.tools.SentryShellKafka)
java.lang.NullPointerException: Config key sentry.service.client.server.rpc-address is required
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.<init>(SentryGenericServiceClientDefaultImpl.java:123)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory.create(SentryGenericServiceClientFactory.java:31)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.run(SentryShellKafka.java:51)
at org.apache.sentry.provider.db.tools.SentryShellCommon.executeShell(SentryShellCommon.java:241)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.main(SentryShellKafka.java:96)
The operation failed. Message: Config key sentry.service.client.server.rpc-address is required
root@node1:~# I can't see this configuration option in CM, but I see rpc-address is configured in CDH 5.10 Sentry service configuration, but without explanation what exactly address should it be or I don't see this: https://www.cloudera.com/documentation/enterprise/5-10-x/topics/sg_sentry_service_config.html Beside that I have that address set and working: root@node1:~# grep -A 1 rpc `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | head -n1`/sentry-site.xml
<name>sentry.service.server.rpc-address</name>
<value>node1</value>
--
<name>sentry.service.server.rpc-port</name>
<value>8038</value>
root@node1:~#
root@node1:~# ps -ef | grep `netstat -anpt | grep LISTEN | grep ':8038' | awk '{print $7}' | awk -F '/' '{print $1}'`
sentry 4599 2654 0 14:35 ? 00:00:20 /usr/lib/jvm/java-8-oracle/jre/bin/java -Xmx1000m -Dhadoop.log.dir=/opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/hadoop/logs -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=/opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Xms268435456 -Xmx268435456 -XX:OnOutOfMemoryError=/usr/lib/cmf/service/common/killparent.sh -Dhadoop.security.logger=INFO,NullAppender org.apache.hadoop.util.RunJar /opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/sentry/lib/sentry-core-common-1.5.1-cdh5.10.0.jar org.apache.sentry.SentryMain --command service --log4jConf /run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER/sentry-log4j.properties -conffile /run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER/sentry-site.xml
sentry 4616 4599 0 14:35 ? 00:00:00 python2.7 /usr/lib/cmf/agent/build/env/bin/cmf-redactor /usr/lib/cmf/service/sentry/sentry.sh
root@node1:~# One more update, when I run kafka-sentry command when I am logged as kafka user in Kerberos it gives me the same error as before disabling sentry privileges caching in Kafka: root@node2:~# cd `ls -dt /var/run/cloudera-scm-agent/process/*kafka* | head -n1`
root@node2:/var/run/cloudera-scm-agent/process/1445-kafka-KAFKA_BROKER# kinit -kt kafka.keytab kafka/node2@TEST.COM
root@node2:/var/run/cloudera-scm-agent/process/1445-kafka-KAFKA_BROKER# kafka-sentry -lp -r zto
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/06/21 17:18:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/06/21 17:18:10 ERROR tools.SentryShellKafka: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$10.handle(SentryGenericPolicyProcessor.java:607)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_privileges_by_role(SentryGenericPolicyProcessor.java:599)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:977)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:962)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$10.handle(SentryGenericPolicyProcessor.java:607)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_privileges_by_role(SentryGenericPolicyProcessor.java:599)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:977)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:962)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at org.apache.sentry.service.thrift.Status.throwIfNotOk(Status.java:113)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.listPrivilegesByRoleName(SentryGenericServiceClientDefaultImpl.java:484)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.listPrivilegesByRoleName(SentryGenericServiceClientDefaultImpl.java:494)
at org.apache.sentry.provider.db.generic.tools.command.ListPrivilegesByRoleCmd.execute(ListPrivilegesByRoleCmd.java:45)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.run(SentryShellKafka.java:83)
at org.apache.sentry.provider.db.tools.SentryShellCommon.executeShell(SentryShellCommon.java:241)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.main(SentryShellKafka.java:96)
The operation failed. Message: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$10.handle(SentryGenericPolicyProcessor.java:607)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_privileges_by_role(SentryGenericPolicyProcessor.java:599)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:977)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:962)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
root@node2:/var/run/cloudera-scm-agent/process/1445-kafka-KAFKA_BROKER# When I add kafka group to Sentry admin groups (sentry.service.admin.group) it looks like everything is working, but only from kerberos logged user kafka: root@node2:~# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: kafka/node2@TEST.COM Valid starting Expires Service principal 21/06/2017 17:16 22/06/2017 03:16 krbtgt/TEST.COM@TEST.COM renew until 28/06/2017 17:16 root@node2:~# kafka-sentry -lp -r myrole
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/06/21 17:31:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root@node2:~#
... View more
06-21-2017
03:35 AM
Hi I've checked the code and made some tests and it looks like Sentry beside sentry.service.allow.connect setting needs also to add group kafka as admin group in sentry.service.admin.group setting, which can be some security problem, because anyone in group kafka will be able to do anything on the cluster. The method handle() from org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor (from the Kafka and Sentry the stack trace) requests that user should be in one of Sentry admin group, and after I added kafka group Kafka brokers started properly without errors: https://github.com/cloudera/sentry/blob/cdh5-1.5.1_5.10.0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java public Response<Set<TSentryRole>> handle() throws Exception {
validateClientVersion(request.getProtocol_version());
Set<String> groups = getRequestorGroups(conf, request.getRequestorUserName());
if (AccessConstants.ALL.equalsIgnoreCase(request.getGroupName())) {
//check all groups which requestorUserName belongs to
} else {
boolean admin = inAdminGroups(groups);
//Only admin users can list all roles in the system ( groupname = null)
//Non admin users are only allowed to list only groups which they belong to
if(!admin && (request.getGroupName() == null || !groups.contains(request.getGroupName()))) {
throw new SentryAccessDeniedException(ACCESS_DENIAL_MESSAGE + request.getRequestorUserName());
}
groups.clear();
groups.add(request.getGroupName());
} I don't see sense of adding group kafka to sentry.service.admin.group because as Sentry documentation says: https://cwiki.apache.org/confluence/display/SENTRY/Sentry+Service+Configuration sentry.service.admin.group - List of groups allowed to make policy updates And since in kafka group we want only to be the kafka service, which should only check permissions at Sentry, there is no need for write permission to make policy changes, because kafka itself should not do any policy changes. I've double checked eventual typos but I found none. I've done copy/paste as well as entering "kafka" by finger in every field in CM and restart cluster and deploy client configuration, but the above result was the olny one.
... View more
06-20-2017
01:18 PM
Hi pdvorak! Thank you for your fast reply. Yest for me this is also very strange, that is why I've decided to turn to community. I have 4 nodes, where node 1 has Sentry Server, and node{2..4} are worker nodes (HDFS DataNode + YARN Node Manager). id kafka: isegrim@node1:~$ for i in {1..4}; do ssh node$i "id kafka"; done
uid=998(kafka) gid=999(kafka) groups=999(kafka)
uid=998(kafka) gid=999(kafka) groups=999(kafka)
uid=998(kafka) gid=999(kafka) groups=999(kafka)
uid=998(kafka) gid=999(kafka) groups=999(kafka)
isegrim@node1:~$ StackTrace: isegrim@node1:~$ grep '2017-06-20 16:01:30.* ERROR' -A 16 /var/log/sentry/hadoop-cmf-sentry-SENTRY_SERVER-node1.log.out
2017-06-20 16:01:30,840 ERROR org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor: Access denied to kafka
org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$9.handle(SentryGenericPolicyProcessor.java:575)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_roles_by_group(SentryGenericPolicyProcessor.java:563)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_roles_by_group.getResult(SentryGenericPolicyService.java:957)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_roles_by_group.getResult(SentryGenericPolicyService.java:942)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:01:39,091 ERROR org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor: Access denied to kafka
org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
isegrim@node1:~$ Confirmation of sentry.service.allow.connect setting: root@node1:~# cd `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | tail -n1`
root@node1:/var/run/cloudera-scm-agent/process/1365-sentry-SENTRY_SERVER# grep -A 1 sentry.service.allow.connect sentry-site.xml
<name>sentry.service.allow.connect</name>
<value>hive,impala,hue,hdfs,solr,kafka</value>
root@node1:/var/run/cloudera-scm-agent/process/1365-sentry-SENTRY_SERVER# Kafka configuration for sentry (in CM). Kafka configuration - users (in CM). Sentry configuration - sentry.service.allow.connect (in CM). I have no idea what have I done wrong or didn't do. Please help. Best Regards
... View more
06-20-2017
08:35 AM
Hello, I have CDH 5.10.1, KAFKA2.1 (0.10) - all of them kerberized. I wanted to use sentry with Kafka as this procedure says: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#using_kafka_with_sentry As procedure says, I've made all 5 pointes and then deploy client configuration and restart depending services and then Kafka did not start, giving error: 2017-06-20 16:01:30,892 ERROR org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton: Unable to create KafkaAuthBinding
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.sentry.kafka.binding.KafkaAuthBinding.createAuthProvider(KafkaAuthBinding.java:194)
at org.apache.sentry.kafka.binding.KafkaAuthBinding.<init>(KafkaAuthBinding.java:97)
at org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton.configure(KafkaAuthBindingSingleton.java:63)
at org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer.configure(SentryKafkaAuthorizer.java:120)
at kafka.server.KafkaServer$$anonfun$startup$3.apply(KafkaServer.scala:211)
at kafka.server.KafkaServer$$anonfun$startup$3.apply(KafkaServer.scala:209)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaServer.startup(KafkaServer.scala:209)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at kafka.Kafka$.main(Kafka.scala:67)
at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
Caused by: java.lang.RuntimeException: Failed to get privileges from Sentry to build cache.
at org.apache.sentry.provider.db.generic.SentryGenericProviderBackend.initialize(SentryGenericProviderBackend.java:89)
at org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine.<init>(SimpleKafkaPolicyEngine.java:44)
... 16 more
Caused by: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$9.handle(SentryGenericPolicyProcessor.java:575)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201) The same messages are in Setnry log. I've checked Sentry configuration and there is configured kafka user as one of service users in sentry.service.allow.connect setting. kafka is local (not LDAP) user, that has the same uid and gid in Linux on every cluster node. Can some one tell me what else should I do to let kafka user to get be allowed to query sentry? Best Regards
... View more
Labels:
- Labels:
-
Apache Sentry
-
Kerberos
06-17-2017
02:41 AM
No, no, jaas.conf before distributing it to worker nodes was: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./isegrim.keytab"
principal="isegrim@TEST.COM";
}; And after I have figured out, that KafkaConsumer constructor looks for jaas.conf and keytab in executor container's local machine's filesystem instead HDFS application CWD, then I've distributed keytab to linux FS and I've changed path to keytab from ./isegrim.keytab (I thought it will be local to Spark application HDFS CWD) to /home/isegrim/isegrim.keytab. I agree that this should not work like this. Now I have to take care of distributing and securing keytab and jaas.conf files. I think it must be something with the Kafka Consumer constructor, which don't know that those files are distributed with the application in it's CWD on HDFS and tries to look for them in local machine's filesystem. If I will have more time I'll try to check the code of those constructors to figure it out.
... View more
06-16-2017
03:52 PM
1 Kudo
Hi, Thanks for your hint mbigelow. As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory. But I've tried your sugestion, and it inspired me for furter investigation: isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar But it gave me the same log+error (without './' in error message) 😕 Caused by: java.io.IOException: jaas.conf (No such file or directory) I've tried hdfs path to jaas.conf but it also failed: hdfs dfs -put jaas.conf
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar Error: Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory) But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag): for i in {1..4}; do
scp jaas.conf worker${i}:/tmp/jaas.conf
done
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar This is the new error: ...
17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
... So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works! $ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/isegrim/isegrim.keytab"
principal="isegrim@TEST.COM";
}; Distribute new jaas.conf and keytab: $ for i in {1..4}; do
scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab
done
$ for i in {1..4}; do
scp jaas.conf worker${i}:/tmp/jaas.conf
done Run the app and check logs: SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar
...
17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started
...
-------------------------------------------
Time: 1497651840000 ms
-------------------------------------------
58 Thank you mbigelow for inspiration!
... View more
06-16-2017
08:59 AM
Hello, I have CDH 5.10.1, SPARK2.1, KAFKA2.1 (0.10) - all of them kerberized. I was able to connect to Kafka from kafka CLI tools, as described here: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#concept_lcn_4mm_s5 But I can not connect as Spark2 Consumer with Direct Stream method, because jaas.conf can't be finden, despite it is properly distributed to HDFS while loading by spark2-submit. I was trying those 2 documentations: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html My code simply counts elements in a batch of a stream. linking in build.sbt: isegrim@sparkgw:~/kafka-spark-krb-test$ cat build.sbt
import sbt.Keys.scalaVersion
lazy val root = (project in file(".")).
settings(
name := "kafka-spark-krb-test",
version := "1.0",
scalaVersion := "2.11.8"
)
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
lazy val spark = Seq(
"spark-core",
"spark-streaming",
"spark-streaming-kafka-0-10"
).map("org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided")
libraryDependencies ++= spark
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyJarName in assembly := "kafka-spark-krb-test.jar" Main code isegrim@sparkgw:~/kafka-spark-krb-test$ cat src/main/scala/af/spark2/tests/Main.scala package af.spark2.tests
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
object Main {
val streamingBatchInterval = Seconds(60)
val streamingTimeout = 432000000
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val ssc = new StreamingContext(conf, streamingBatchInterval)
ssc.checkpoint("spark/checkpoint/kafka-spark-krb-test")
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "broker1:9092,broker2:9092",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka-spark-krb-test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
messages.count().print()
ssc.start()
ssc.awaitTerminationOrTimeout(streamingTimeout)
ssc.stop(true, true)
}
} I start the app like this: isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar Some log + error I get: ...
17/06/16 17:16:38 INFO yarn.Client: Uploading resource file:/home/isegrim/.keytab/isegrim.keytab -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/isegrim.keytab
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/home/isegrim/jaas.conf#jaas.conf -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/jaas.conf
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/tmp/spark-f1a0e9a4-0c13-495a-a904-2dd1ca1303c1/__spark_conf__1583085251581943316.zip -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/__spark_conf__.zip
17/06/16 17:16:39 INFO spark.SecurityManager: Changing view acls to: isegrim
17/06/16 17:16:39 INFO spark.SecurityManager: Changing modify acls to: isegrim
...
17/06/16 17:16:49 INFO dstream.ForEachDStream: Slide time = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated
17/06/16 17:16:49 INFO dstream.ForEachDStream: Checkpoint interval = null
17/06/16 17:16:49 INFO dstream.ForEachDStream: Remember interval = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1b485c8b
17/06/16 17:16:49 INFO consumer.ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [broker1:9092, broker2:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = kafka-spark-krb-test
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
17/06/16 17:16:49 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at af.spark2.tests.Main$.main(Main.scala:34)
at af.spark2.tests.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47)
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 25 more
Caused by: java.io.IOException: ./jaas.conf (No such file or directory)
at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
... 41 more Is it possible to connect to kerberized Kafka from SparkStreaming using Cloudera's CDH, KAFKA and SPARK2 or Spark1.6 from CDH? Best Regards
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
-
Kerberos