Support Questions

Find answers, ask questions, and share your expertise

Kafka DirectStream and Python

avatar
Explorer

Hello all, 

does cloudera supports Kafka direct stream with python ? 

I'm facing some issue about missing methods.

 

1 ACCEPTED SOLUTION

avatar
Contributor

Hello,

 

Structured Streaming is now supported in 6.1 

 

https://blog.cloudera.com/blog/2018/12/cloudera-enterprise-6-1-0-is-now-available/

 

Could you retry testing it in this version? Should you face the same issue in that version let us know.

View solution in original post

5 REPLIES 5

avatar
Contributor

Could you ellaborate on what and how you are trying to achieve? What kind of errors are you receiving?

 

I'm assuming you are talking about setting up a Spark/Kafka integration using Python as the Spark language. Something as follows:

http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html#approach-2-direct-approach-...

 

Could you confirm or amend?

 

Cloudera's distribution of Kafka officially supports Flume, Spark, and Java clients [1] as these have been tested by our development team. However, Spark Structured Streaming is currently untested and unsupported. 

 

[1]: http://www.cloudera.com/documentation/enterprise/latest/topics/kafka_end_to_end.html#kafka_end_to_en...

avatar
Explorer

Hello  Manuel, 

I'm trying to use structured streaming using python, but all attempt made are unsuccessful.

 

Please read this post: 

 

https://community.hortonworks.com/articles/197922/spark-23-structured-streaming-integration-with-apa...

This error was the 1st I got,so I attempted to change the jar to spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar and the error now is the one shown into the attached picture.

 

There is no way to move a step from this point. So we started to migrate to Direct Stream, also using this last option we found other issue on python so now we are using scala/java code. But this is not a problem.

 

If we can make Structured streaming available, I think that it will be a great option for all the cloudera users.


Screen Shot 2018-10-29 at 10.52.13 AM.png

avatar
Contributor

As of currently, Spark Structured Streaming is not supported:

 

https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_structured_s...

 

 

avatar
Explorer

I switched from Python to Scala, which is a better supported language since Spark itself has been written in Scala. 

 

I keep trying, on a my remote friend suggestion and found that this works fine, so Structured Streaming it is not supported but it is working. 

 

 

scala> import org.apache.spark.sql.Encoders

scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)

scala> val schema = Encoders.product[Amazon].schema

scala > val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]
data: org.apache.spark.sql.Dataset[Amazon] = [EventId: string, DOCOMOEntitlementId: string ... 

scala> data.isStreaming 
res0: Boolean = true

scala> data.writeStream.outputMode("append").format("console")

res1: org.apache.spark.sql.streaming.DataStreamWriter[Amazon] = org.apache.spark.sql.streaming.DataStreamWriter@7dc56c0a

scala> res1.start()

 

On the other hand using 

spark.readStream.format("kafka") .... 

Lead to this issue:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 50 more

Is there any Jars from Cloudera that can solve this issue? 

If there are no Jars from Cloudera, what it the best that matches? 

 

Well after a while I found that the library I need is 

 

https://repository.cloudera.com/cloudera/cloudera-repos/org/apache/spark/spark-sql-kafka-0-10_2.11/2...

 

And anything goes almost fine, but...

 

org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2

 and this depends on how the kakfa code works, and there is no option to avoid it.

Screen Shot 2018-11-15 at 12.42.20 PM.png 

 

This, of course, is a problem when kafka is under Sentry. Is there any option to have it working without disabling Sentry? 

 

 

avatar
Contributor

Hello,

 

Structured Streaming is now supported in 6.1 

 

https://blog.cloudera.com/blog/2018/12/cloudera-enterprise-6-1-0-is-now-available/

 

Could you retry testing it in this version? Should you face the same issue in that version let us know.