Reply
Explorer
Posts: 8
Registered: ‎06-15-2018

Kafka DirectStream and Python

Hello all, 

does cloudera supports Kafka direct stream with python ? 

I'm facing some issue about missing methods.

 

Cloudera Employee
Posts: 47
Registered: ‎07-10-2017

Re: Kafka DirectStream and Python

Could you ellaborate on what and how you are trying to achieve? What kind of errors are you receiving?

 

I'm assuming you are talking about setting up a Spark/Kafka integration using Python as the Spark language. Something as follows:

http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html#approach-2-direct-approach-...

 

Could you confirm or amend?

 

Cloudera's distribution of Kafka officially supports Flume, Spark, and Java clients [1] as these have been tested by our development team. However, Spark Structured Streaming is currently untested and unsupported. 

 

[1]: http://www.cloudera.com/documentation/enterprise/latest/topics/kafka_end_to_end.html#kafka_end_to_en...

Explorer
Posts: 8
Registered: ‎06-15-2018

Re: Kafka DirectStream and Python

Hello  Manuel, 

I'm trying to use structured streaming using python, but all attempt made are unsuccessful.

 

Please read this post: 

 

https://community.hortonworks.com/articles/197922/spark-23-structured-streaming-integration-with-apa...

This error was the 1st I got,so I attempted to change the jar to spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar and the error now is the one shown into the attached picture.

 

There is no way to move a step from this point. So we started to migrate to Direct Stream, also using this last option we found other issue on python so now we are using scala/java code. But this is not a problem.

 

If we can make Structured streaming available, I think that it will be a great option for all the cloudera users.


Screen Shot 2018-10-29 at 10.52.13 AM.png
Highlighted
Cloudera Employee
Posts: 47
Registered: ‎07-10-2017

Re: Kafka DirectStream and Python

As of currently, Spark Structured Streaming is not supported:

 

https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_structured_s...

 

 

Explorer
Posts: 8
Registered: ‎06-15-2018

Re: Kafka DirectStream and Python

[ Edited ]

I switched from Python to Scala, which is a better supported language since Spark itself has been written in Scala. 

 

I keep trying, on a my remote friend suggestion and found that this works fine, so Structured Streaming it is not supported but it is working. 

 

 

scala> import org.apache.spark.sql.Encoders

scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)

scala> val schema = Encoders.product[Amazon].schema

scala > val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]
data: org.apache.spark.sql.Dataset[Amazon] = [EventId: string, DOCOMOEntitlementId: string ... 

scala> data.isStreaming 
res0: Boolean = true

scala> data.writeStream.outputMode("append").format("console")

res1: org.apache.spark.sql.streaming.DataStreamWriter[Amazon] = org.apache.spark.sql.streaming.DataStreamWriter@7dc56c0a

scala> res1.start()

 

On the other hand using 

spark.readStream.format("kafka") .... 

Lead to this issue:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 50 more

Is there any Jars from Cloudera that can solve this issue? 

If there are no Jars from Cloudera, what it the best that matches? 

 

Well after a while I found that the library I need is 

 

https://repository.cloudera.com/cloudera/cloudera-repos/org/apache/spark/spark-sql-kafka-0-10_2.11/2...

 

And anything goes almost fine, but...

 

org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2

 and this depends on how the kakfa code works, and there is no option to avoid it.

Screen Shot 2018-11-15 at 12.42.20 PM.png 

 

This, of course, is a problem when kafka is under Sentry. Is there any option to have it working without disabling Sentry? 

 

 

Announcements