Member since 
    
	
		
		
		01-11-2016
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                34
            
            
                Posts
            
        
                8
            
            
                Kudos Received
            
        
                5
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 7054 | 06-21-2017 06:09 AM | |
| 38522 | 06-16-2017 03:52 PM | |
| 23332 | 12-12-2016 02:30 PM | |
| 4663 | 09-01-2016 08:40 AM | |
| 3914 | 05-09-2016 05:01 PM | 
			
    
	
		
		
		11-25-2017
	
		
		01:25 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 OK, the problem is that for some reason spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar is not loaded and lookupDataSource do not see KafkaSourceProvider as extended class of trait DataSourceRegister, so there is no   override def shortName(): String = "kafka"       and when not found it sets default data source appending to given datasource provider, and that is why I got class not found kafka.DefaultSource.     The solution is to   1. get from somewhere this jar spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar   a) manually download from Cloudera repo https://repository.cloudera.com/cloudera/cloudera-repos/org/apache/spark/spark-sql-kafka-0-10_2.11/2.1.0.cloudera1/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar  or   b) run spark2-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 as Jacek Laskowski explained:  https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-KafkaSource.adoc       and   2. Add run your app via spark2-submit --jars ~/.ivy2/cache/org.apache.spark/spark-sql-kafka-0-10_2.11/jars/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar        Very important: remember to set proper Kafka version (in this case 0.10) via export or in Spark2 service configuration in Cloudera Manager.  https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-23-2017
	
		
		08:43 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi  
   
 I use CDH 5.10.1, Spark 2.1 and Kafka 2.1 
   
 When I try simple program for ETL: 
 val mystream = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", "mybroker:9092")    .option("subscribe", "mytopic")    .load() 
 With dependencies in build.sbt: 
 scalaVersion := "2.11.8"    lazy val spark = Seq (      "spark-core",      "spark-hive",      "spark-streaming",      "spark-sql",      "spark-streaming-kafka-0-10"  ).map( "org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided")    libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.1.0.cloudera1"  libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0-kafka-2.1.0"  libraryDependencies += "org.apache.kafka" %% "kafka" % "0.10.0-kafka-2.1.0"  libraryDependencies ++= spark    resolvers ++= Seq(    "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"  ) 
   
 When submitting my app, I get error: 
   
 Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:594)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:197)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at myetl.Main$.main(Main.scala:20)
        at myetl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:579)
        ... 18 more 
   
   
 Please help... 
   
   
   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Hive
 - 
						
							
		
			Apache Kafka
 - 
						
							
		
			Apache Spark
 
			
    
	
		
		
		08-11-2017
	
		
		03:16 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 So, summarizing:  When starting Spark with --deploy-mode client one must distribute Jaas file and keytab to every machine.  When starting Spark with --deploy-mode cluster one must distribute Jaas file and keytab with --files and/or --keytab. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-21-2017
	
		
		06:09 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Well as far as I can read the code I've cited, there is problem when Kafka wants to list roles, and it want to do this when caching Sentry privileges is enabled.     When I:  1. In Kafka configuration disabled sentry.kafka.caching.enable  2. In Sentry configuration deleted kafka group from sentry.service.admin.group, so the configuration is:  root@node1:~# cd `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | head -n1`
root@node1:/var/run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER# grep -A 1 -E "sentry.service.(allow.connect|admin.group)" sentry-site.xml
    <name>sentry.service.admin.group</name>
    <value>hive,impala,hue,sudo</value>
--
    <name>sentry.service.allow.connect</name>
    <value>hive,impala,hue,hdfs,solr,kafka</value>
root@node1:/var/run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER#        3. Depoly client configuration and restart dependant services     After this steps Kafka started properly, so turning off caching of Sentry privileges was a workaround for me to start Kafka without errors.    Though I have problems when using kafka-sentry tool:  root@node1:~# kinit isegrim
Password for isegrim@TEST.COM:
root@node1:~# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: isegrim@TEST.COM
Valid starting    Expires           Service principal
21/06/2017 15:01  22/06/2017 01:01  krbtgt/TEST.COM@TEST.COM
        renew until 28/06/2017 15:01
root@node1:~# kafka-sentry --config `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | head -n1` -lp -r myrole
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-06-21 15:02:16,992] ERROR Config key sentry.service.client.server.rpc-address is required (org.apache.sentry.provider.db.generic.tools.SentryShellKafka)
java.lang.NullPointerException: Config key sentry.service.client.server.rpc-address is required
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.<init>(SentryGenericServiceClientDefaultImpl.java:123)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory.create(SentryGenericServiceClientFactory.java:31)
        at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.run(SentryShellKafka.java:51)
        at org.apache.sentry.provider.db.tools.SentryShellCommon.executeShell(SentryShellCommon.java:241)
        at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.main(SentryShellKafka.java:96)
The operation failed. Message: Config key sentry.service.client.server.rpc-address is required
root@node1:~#  I can't see this configuration option in CM, but I see rpc-address is configured in CDH 5.10 Sentry service configuration, but without explanation what exactly address should it be or I don't see this:  https://www.cloudera.com/documentation/enterprise/5-10-x/topics/sg_sentry_service_config.html      Beside that I have that address set and working:  root@node1:~# grep -A 1 rpc `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | head -n1`/sentry-site.xml
    <name>sentry.service.server.rpc-address</name>
    <value>node1</value>
--
    <name>sentry.service.server.rpc-port</name>
    <value>8038</value>
root@node1:~#
root@node1:~# ps -ef | grep `netstat -anpt | grep LISTEN | grep ':8038' | awk '{print $7}' | awk -F '/' '{print $1}'`
sentry    4599  2654  0 14:35 ?        00:00:20 /usr/lib/jvm/java-8-oracle/jre/bin/java -Xmx1000m -Dhadoop.log.dir=/opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/hadoop/logs -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=/opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Xms268435456 -Xmx268435456 -XX:OnOutOfMemoryError=/usr/lib/cmf/service/common/killparent.sh -Dhadoop.security.logger=INFO,NullAppender org.apache.hadoop.util.RunJar /opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.41/lib/sentry/lib/sentry-core-common-1.5.1-cdh5.10.0.jar org.apache.sentry.SentryMain --command service --log4jConf /run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER/sentry-log4j.properties -conffile /run/cloudera-scm-agent/process/1442-sentry-SENTRY_SERVER/sentry-site.xml
sentry    4616  4599  0 14:35 ?        00:00:00 python2.7 /usr/lib/cmf/agent/build/env/bin/cmf-redactor /usr/lib/cmf/service/sentry/sentry.sh
root@node1:~#     One more update, when I run kafka-sentry command when I am logged as kafka user in Kerberos it gives me the same error as before disabling sentry privileges caching in Kafka:  root@node2:~# cd `ls -dt /var/run/cloudera-scm-agent/process/*kafka* | head -n1`
root@node2:/var/run/cloudera-scm-agent/process/1445-kafka-KAFKA_BROKER# kinit -kt kafka.keytab kafka/node2@TEST.COM
root@node2:/var/run/cloudera-scm-agent/process/1445-kafka-KAFKA_BROKER# kafka-sentry -lp -r zto
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/06/21 17:18:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/06/21 17:18:10 ERROR tools.SentryShellKafka: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$10.handle(SentryGenericPolicyProcessor.java:607)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_privileges_by_role(SentryGenericPolicyProcessor.java:599)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:977)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:962)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
        at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$10.handle(SentryGenericPolicyProcessor.java:607)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_privileges_by_role(SentryGenericPolicyProcessor.java:599)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:977)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:962)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
        at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
        at org.apache.sentry.service.thrift.Status.throwIfNotOk(Status.java:113)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.listPrivilegesByRoleName(SentryGenericServiceClientDefaultImpl.java:484)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.listPrivilegesByRoleName(SentryGenericServiceClientDefaultImpl.java:494)
        at org.apache.sentry.provider.db.generic.tools.command.ListPrivilegesByRoleCmd.execute(ListPrivilegesByRoleCmd.java:45)
        at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.run(SentryShellKafka.java:83)
        at org.apache.sentry.provider.db.tools.SentryShellCommon.executeShell(SentryShellCommon.java:241)
        at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.main(SentryShellKafka.java:96)
The operation failed. Message: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$10.handle(SentryGenericPolicyProcessor.java:607)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_privileges_by_role(SentryGenericPolicyProcessor.java:599)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:977)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_privileges_by_role.getResult(SentryGenericPolicyService.java:962)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
        at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
root@node2:/var/run/cloudera-scm-agent/process/1445-kafka-KAFKA_BROKER#    When I add kafka group to Sentry admin groups (sentry.service.admin.group) it looks like everything is working, but only from kerberos logged user kafka:  root@node2:~# klist  Ticket cache: FILE:/tmp/krb5cc_0  Default principal: kafka/node2@TEST.COM    Valid starting    Expires           Service principal  21/06/2017 17:16  22/06/2017 03:16  krbtgt/TEST.COM@TEST.COM          renew until 28/06/2017 17:16  root@node2:~# kafka-sentry -lp -r myrole
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/06/21 17:31:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root@node2:~# 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-21-2017
	
		
		03:35 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi     I've checked the code and made some tests and it looks like Sentry beside sentry.service.allow.connect setting needs also to add group kafka as admin group in sentry.service.admin.group setting, which can be some security problem, because anyone in group kafka will be able to do anything on the cluster.     The method handle() from org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor (from the Kafka and Sentry the stack trace) requests that user should be in one of Sentry admin group, and after I added kafka group Kafka brokers started properly without errors:     https://github.com/cloudera/sentry/blob/cdh5-1.5.1_5.10.0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java           public Response<Set<TSentryRole>> handle() throws Exception {
        validateClientVersion(request.getProtocol_version());
        Set<String> groups = getRequestorGroups(conf, request.getRequestorUserName());
        if (AccessConstants.ALL.equalsIgnoreCase(request.getGroupName())) {
          //check all groups which requestorUserName belongs to
        } else {
          boolean admin = inAdminGroups(groups);
          //Only admin users can list all roles in the system ( groupname = null)
          //Non admin users are only allowed to list only groups which they belong to
          if(!admin && (request.getGroupName() == null || !groups.contains(request.getGroupName()))) {
            throw new SentryAccessDeniedException(ACCESS_DENIAL_MESSAGE + request.getRequestorUserName());
          }
          groups.clear();
          groups.add(request.getGroupName());
        }    I don't see sense of adding group kafka to sentry.service.admin.group because as Sentry documentation says:     https://cwiki.apache.org/confluence/display/SENTRY/Sentry+Service+Configuration  sentry.service.admin.group - List of groups allowed to make policy updates     And since in kafka group we want only to be the kafka service, which should only check permissions at Sentry, there is no need for write permission to make policy changes, because kafka itself should not do any policy changes.     I've double checked eventual typos but I found none. I've done copy/paste as well as entering "kafka" by finger in every field in CM and restart cluster and deploy client configuration, but the above result was the olny one.    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-20-2017
	
		
		01:18 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi pdvorak!     Thank you for your fast reply. Yest for me this is also very strange, that is why I've decided to turn to community.  I have 4 nodes, where node 1 has Sentry Server, and node{2..4} are worker nodes (HDFS DataNode + YARN Node Manager).     id kafka:  isegrim@node1:~$ for i in {1..4}; do ssh node$i "id kafka"; done
uid=998(kafka) gid=999(kafka) groups=999(kafka)
uid=998(kafka) gid=999(kafka) groups=999(kafka)
uid=998(kafka) gid=999(kafka) groups=999(kafka)
uid=998(kafka) gid=999(kafka) groups=999(kafka)
isegrim@node1:~$    StackTrace:  isegrim@node1:~$ grep '2017-06-20 16:01:30.* ERROR' -A 16 /var/log/sentry/hadoop-cmf-sentry-SENTRY_SERVER-node1.log.out
2017-06-20 16:01:30,840 ERROR org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor: Access denied to kafka
org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$9.handle(SentryGenericPolicyProcessor.java:575)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.list_sentry_roles_by_group(SentryGenericPolicyProcessor.java:563)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_roles_by_group.getResult(SentryGenericPolicyService.java:957)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService$Processor$list_sentry_roles_by_group.getResult(SentryGenericPolicyService.java:942)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorWrapper.process(SentryGenericPolicyProcessorWrapper.java:37)
        at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:01:39,091 ERROR org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor: Access denied to kafka
org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
isegrim@node1:~$    Confirmation of sentry.service.allow.connect setting:  root@node1:~# cd `ls -dt /var/run/cloudera-scm-agent/process/*sentry* | tail -n1`
root@node1:/var/run/cloudera-scm-agent/process/1365-sentry-SENTRY_SERVER# grep -A 1 sentry.service.allow.connect sentry-site.xml
    <name>sentry.service.allow.connect</name>
    <value>hive,impala,hue,hdfs,solr,kafka</value>
root@node1:/var/run/cloudera-scm-agent/process/1365-sentry-SENTRY_SERVER#        Kafka configuration for sentry (in CM).         Kafka configuration - users (in CM).            Sentry configuration - sentry.service.allow.connect (in CM).      I have no idea what have I done wrong or didn't do. Please help.     Best Regards    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-20-2017
	
		
		08:35 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hello,    I have CDH 5.10.1, KAFKA2.1 (0.10) - all of them kerberized.  I wanted to use sentry with Kafka as this procedure says:  https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#using_kafka_with_sentry    As procedure says, I've made all 5 pointes and then deploy client configuration and restart depending services and then Kafka did not start, giving error:  2017-06-20 16:01:30,892 ERROR org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton: Unable to create KafkaAuthBinding
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.apache.sentry.kafka.binding.KafkaAuthBinding.createAuthProvider(KafkaAuthBinding.java:194)
        at org.apache.sentry.kafka.binding.KafkaAuthBinding.<init>(KafkaAuthBinding.java:97)
        at org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton.configure(KafkaAuthBindingSingleton.java:63)
        at org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer.configure(SentryKafkaAuthorizer.java:120)
        at kafka.server.KafkaServer$$anonfun$startup$3.apply(KafkaServer.scala:211)
        at kafka.server.KafkaServer$$anonfun$startup$3.apply(KafkaServer.scala:209)
        at scala.Option.map(Option.scala:146)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:209)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
        at kafka.Kafka$.main(Kafka.scala:67)
        at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
        at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
Caused by: java.lang.RuntimeException: Failed to get privileges from Sentry to build cache.
        at org.apache.sentry.provider.db.generic.SentryGenericProviderBackend.initialize(SentryGenericProviderBackend.java:89)
        at org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine.<init>(SimpleKafkaPolicyEngine.java:44)
        ... 16 more
Caused by: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka. Server Stacktrace: org.apache.sentry.provider.db.SentryAccessDeniedException: Access denied to kafka
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor$9.handle(SentryGenericPolicyProcessor.java:575)
        at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessor.requestHandle(SentryGenericPolicyProcessor.java:201)  The same messages are in Setnry log.     I've checked Sentry configuration and there is configured kafka user as one of service users in sentry.service.allow.connect setting.    kafka is local (not LDAP) user, that has the same uid and gid in Linux on every cluster node.    Can some one tell me what else should I do to let kafka user to get be allowed to query sentry?     Best Regards 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Sentry
 - 
						
							
		
			Kerberos
 
			
    
	
		
		
		06-17-2017
	
		
		02:41 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 No, no, jaas.conf before distributing it to worker nodes was:  KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./isegrim.keytab"
principal="isegrim@TEST.COM";
};  And after I have figured out, that KafkaConsumer constructor looks for jaas.conf and keytab in executor container's local machine's filesystem instead HDFS application CWD, then I've distributed keytab to linux FS and I've changed path to keytab from ./isegrim.keytab (I thought it will be local to Spark application HDFS CWD) to /home/isegrim/isegrim.keytab.     I agree that this should not work like this. Now I have to take care of distributing and securing keytab and jaas.conf files.    I think it must be something with the Kafka Consumer constructor, which don't know that those files are distributed with the application in it's CWD on HDFS and tries to look for them in local machine's filesystem.     If I will have more time I'll try to check the code of those constructors to figure it out.    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-16-2017
	
		
		03:52 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Hi,     Thanks for your hint mbigelow.  As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.  But I've tried your sugestion, and it inspired me for furter investigation:  isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar  But it gave me the same log+error (without './' in error message) 😕  Caused by: java.io.IOException: jaas.conf (No such file or directory)     I've tried hdfs path to jaas.conf but it also failed:  hdfs dfs -put jaas.conf
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar  Error:  Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)  But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):  for i in {1..4}; do
    scp jaas.conf worker${i}:/tmp/jaas.conf
done
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar  This is the new error:  ...
17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
...
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
...           So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!  $ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/isegrim/isegrim.keytab"
principal="isegrim@TEST.COM";
};  Distribute new jaas.conf and keytab:  $ for i in {1..4}; do
    scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab
done
$ for i in {1..4}; do
    scp jaas.conf worker${i}:/tmp/jaas.conf
done  Run the app and check logs:  SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar
...
17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started
...
-------------------------------------------
Time: 1497651840000 ms
-------------------------------------------
58  Thank you mbigelow for inspiration! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-16-2017
	
		
		08:59 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hello,    I have CDH 5.10.1, SPARK2.1, KAFKA2.1 (0.10) - all of them kerberized.  I was able to connect to Kafka from kafka CLI tools, as described here:  https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#concept_lcn_4mm_s5    But I can not connect as Spark2 Consumer with Direct Stream method, because jaas.conf can't be finden, despite it is properly distributed to HDFS while loading by spark2-submit.    I was trying those 2 documentations:  https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html  https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html    My code simply counts elements in a batch of a stream.  linking in build.sbt:  isegrim@sparkgw:~/kafka-spark-krb-test$ cat build.sbt
import sbt.Keys.scalaVersion
lazy val root = (project in file(".")).
        settings(
                name := "kafka-spark-krb-test",
                version := "1.0",
                scalaVersion := "2.11.8"
)
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
lazy val spark = Seq(
        "spark-core",
        "spark-streaming",
        "spark-streaming-kafka-0-10"
).map("org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided")
libraryDependencies ++= spark
assemblyMergeStrategy in assembly := {
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x =>
                val oldStrategy = (assemblyMergeStrategy in assembly).value
                oldStrategy(x)
}
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyJarName in assembly := "kafka-spark-krb-test.jar"      Main code      isegrim@sparkgw:~/kafka-spark-krb-test$ cat src/main/scala/af/spark2/tests/Main.scala  package af.spark2.tests
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
object Main {
  val streamingBatchInterval = Seconds(60)
  val streamingTimeout = 432000000
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val ssc = new StreamingContext(conf, streamingBatchInterval)
    ssc.checkpoint("spark/checkpoint/kafka-spark-krb-test")
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "broker1:9092,broker2:9092",
      "security.protocol" -> "SASL_PLAINTEXT",
      "sasl.kerberos.service.name" -> "kafka",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "kafka-spark-krb-test",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("mytopic")
    val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    messages.count().print()
    ssc.start()
    ssc.awaitTerminationOrTimeout(streamingTimeout)
    ssc.stop(true, true)
  }
}        I start the app like this:  isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \  --master yarn \  --deploy-mode client \  --queue myqueue \  --keytab /home/isegrim/isegrim.keytab \  --principal isegrim@TEST.COM \  --files /home/isegrim/jaas.conf#jaas.conf \  --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \  target/scala-2.11/kafka-spark-krb-test.jar      Some log + error I get:  ...
17/06/16 17:16:38 INFO yarn.Client: Uploading resource file:/home/isegrim/.keytab/isegrim.keytab -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/isegrim.keytab
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/home/isegrim/jaas.conf#jaas.conf -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/jaas.conf
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/tmp/spark-f1a0e9a4-0c13-495a-a904-2dd1ca1303c1/__spark_conf__1583085251581943316.zip -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/__spark_conf__.zip
17/06/16 17:16:39 INFO spark.SecurityManager: Changing view acls to: isegrim
17/06/16 17:16:39 INFO spark.SecurityManager: Changing modify acls to: isegrim
...
17/06/16 17:16:49 INFO dstream.ForEachDStream: Slide time = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated
17/06/16 17:16:49 INFO dstream.ForEachDStream: Checkpoint interval = null
17/06/16 17:16:49 INFO dstream.ForEachDStream: Remember interval = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1b485c8b
17/06/16 17:16:49 INFO consumer.ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [broker1:9092, broker2:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 2147483647
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        group.id = kafka-spark-krb-test
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = SASL_PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest
17/06/16 17:16:49 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at af.spark2.tests.Main$.main(Main.scala:34)
        at af.spark2.tests.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
                at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
        at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
        at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
        ... 25 more
Caused by: java.io.IOException: ./jaas.conf (No such file or directory)
        at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
        at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
        ... 41 more      Is it possible to connect to kerberized Kafka from SparkStreaming using Cloudera's CDH, KAFKA and SPARK2 or Spark1.6 from CDH?    Best Regards     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Kafka
 - 
						
							
		
			Apache Spark
 - 
						
							
		
			Kerberos