11-04-2018 03:10 AM
Could you ellaborate on what and how you are trying to achieve? What kind of errors are you receiving?
I'm assuming you are talking about setting up a Spark/Kafka integration using Python as the Spark language. Something as follows:
Could you confirm or amend?
Cloudera's distribution of Kafka officially supports Flume, Spark, and Java clients  as these have been tested by our development team. However, Spark Structured Streaming is currently untested and unsupported.
11-04-2018 11:37 PM
I'm trying to use structured streaming using python, but all attempt made are unsuccessful.
Please read this post:
This error was the 1st I got,so I attempted to change the jar to spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar and the error now is the one shown into the attached picture.
There is no way to move a step from this point. So we started to migrate to Direct Stream, also using this last option we found other issue on python so now we are using scala/java code. But this is not a problem.
If we can make Structured streaming available, I think that it will be a great option for all the cloudera users.
11-15-2018 01:02 AM - edited 11-15-2018 03:45 AM
I switched from Python to Scala, which is a better supported language since Spark itself has been written in Scala.
I keep trying, on a my remote friend suggestion and found that this works fine, so Structured Streaming it is not supported but it is working.
scala> import org.apache.spark.sql.Encoders scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String) scala> val schema = Encoders.product[Amazon].schema scala > val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon] data: org.apache.spark.sql.Dataset[Amazon] = [EventId: string, DOCOMOEntitlementId: string ... scala> data.isStreaming res0: Boolean = true scala> data.writeStream.outputMode("append").format("console") res1: org.apache.spark.sql.streaming.DataStreamWriter[Amazon] = org.apache.spark.sql.streaming.DataStreamWriter@7dc56c0a scala> res1.start()
On the other hand using
Lead to this issue:
java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159) ... 49 elided Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618) ... 50 more
Is there any Jars from Cloudera that can solve this issue?
If there are no Jars from Cloudera, what it the best that matches?
Well after a while I found that the library I need is
And anything goes almost fine, but...
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0 18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1 18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2
and this depends on how the kakfa code works, and there is no option to avoid it.
This, of course, is a problem when kafka is under Sentry. Is there any option to have it working without disabling Sentry?