Support Questions
Find answers, ask questions, and share your expertise

Spark-streaming with kafka 2.0

Spark-streaming with kafka 2.0

Explorer

Hi,

I am trying to develop a spark consumer using spark 2.0. I have created the Spark session and created the Dataframe as below: vallogLinesDStream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("zookeeper.connect","localhost:2181").option("subscribe", topics).load() val ds1 = logLinesDStream.selectExpr("CAST(value AS STRING)").as[(String)] ds1 is of schema: [value: string] value contains records of Apache web log. So, I wanted to convert to case class MyApacheLog(ipAdd: String, clientId: String,userId: String, dateTime: String, method: String,requestURI: String, protocol: String,responseCde: Int, cLen: Long). I tried the below code but does not seem to work. Could you suggest a better and workable approach. val dataRDD = ds1.rdd

val ip_address = dataRDD.map(record =>{ val columns = record.split(" ") columns(1)}) Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka. Suggestion please. Thank you.

3 REPLIES 3

Re: Spark-streaming with kafka 2.0

Explorer

How to convert Dataset[Row] to Dataset[MyApacheLog]?

Re: Spark-streaming with kafka 2.0

Super Guru

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/stream...

http://spark.apache.org/docs/latest/streaming-programming-guide.html

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

in Spark 2.0, you need to call the start() method

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-q...

See an example:

case class DeviceData(device: String, type: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("type").count()                          // using untyped API

// Running average signal for each device type
Import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal))    // using typed API

see: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-dataframe.html

Re: Spark-streaming with kafka 2.0

Explorer

Thanks. I tried the IOT example as to convert to the object type. Let me retry.