Support Questions

Find answers, ask questions, and share your expertise
Celebrating as our community reaches 100,000 members! Thank you!

Kafka DirectStream and Python


Hello all, 

does cloudera supports Kafka direct stream with python ? 

I'm facing some issue about missing methods.






Structured Streaming is now supported in 6.1


Could you retry testing it in this version? Should you face the same issue in that version let us know.

View solution in original post



Could you ellaborate on what and how you are trying to achieve? What kind of errors are you receiving?


I'm assuming you are talking about setting up a Spark/Kafka integration using Python as the Spark language. Something as follows:


Could you confirm or amend?


Cloudera's distribution of Kafka officially supports Flume, Spark, and Java clients [1] as these have been tested by our development team. However, Spark Structured Streaming is currently untested and unsupported. 




Hello  Manuel, 

I'm trying to use structured streaming using python, but all attempt made are unsuccessful.


Please read this post:

This error was the 1st I got,so I attempted to change the jar to spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar and the error now is the one shown into the attached picture.


There is no way to move a step from this point. So we started to migrate to Direct Stream, also using this last option we found other issue on python so now we are using scala/java code. But this is not a problem.


If we can make Structured streaming available, I think that it will be a great option for all the cloudera users.

Screen Shot 2018-10-29 at 10.52.13 AM.png


As of currently, Spark Structured Streaming is not supported:




I switched from Python to Scala, which is a better supported language since Spark itself has been written in Scala. 


I keep trying, on a my remote friend suggestion and found that this works fine, so Structured Streaming it is not supported but it is working. 



scala> import org.apache.spark.sql.Encoders

scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)

scala> val schema = Encoders.product[Amazon].schema

scala > val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]
data: org.apache.spark.sql.Dataset[Amazon] = [EventId: string, DOCOMOEntitlementId: string ... 

scala> data.isStreaming 
res0: Boolean = true

scala> data.writeStream.outputMode("append").format("console")

res1: org.apache.spark.sql.streaming.DataStreamWriter[Amazon] = org.apache.spark.sql.streaming.DataStreamWriter@7dc56c0a

scala> res1.start()


On the other hand using 

spark.readStream.format("kafka") .... 

Lead to this issue:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(
  at java.lang.ClassLoader.loadClass(
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 50 more

Is there any Jars from Cloudera that can solve this issue? 

If there are no Jars from Cloudera, what it the best that matches? 


Well after a while I found that the library I need is


And anything goes almost fine, but...


org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2

 and this depends on how the kakfa code works, and there is no option to avoid it.

Screen Shot 2018-11-15 at 12.42.20 PM.png 


This, of course, is a problem when kafka is under Sentry. Is there any option to have it working without disabling Sentry? 






Structured Streaming is now supported in 6.1


Could you retry testing it in this version? Should you face the same issue in that version let us know.