Support Questions

Find answers, ask questions, and share your expertise

Error in Spark Streaming - Kafka integration Structured Streaming

avatar
Expert Contributor

Hi

I am getting error "Queries with streaming sources must be executed with writeStream.start();" while running the code shown below.

Any help will be greatly appreciated.

package ca.twitter2


import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
//import org.apache.spark.streaming.kafka010._
//import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import kafka.serializer.StringDecoder
import java.util.HashMap
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.kafka010
//import com.datastax.spark.connector._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{explode, split}




object kafkatest3 {    
  def main(args: Array[String]) { 
      
          
       val spark = SparkSession
      .builder()
      .appName("kafkatest3")
      .master("local[*]")
         .getOrCreate()
         
               
          val topics = Array("twitter")
        


        val  ds1 = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "siaihdf1a.coc.ca:6667") 
        .option("subscribe", "twitter") 
          .option("startingOffsets", "earliest") 
          .load()
      
          import spark.implicits._
 
   val df = ds1.selectExpr("CAST(key AS STRING)", "CAST( value AS STRING)").as[(String, String)]
   ds1.printSchema()
   
   df.createOrReplaceTempView("df");
      val records = spark.sql ("SELECT count(*) from df GROUP BY key")  
       records.show()
 
   
   val query =   records.writeStream.outputMode("complete").format("console").start()        
          
            
    query.awaitTermination()   
    spark.stop()
     }
}



Thanks,

CHandra

2 REPLIES 2

avatar
@chandramouli muthukumaran

Could you please add the complete error trace?

avatar

I wrote about this in my Spark Structured Streaming blog here: https://www.linkedin.com/pulse/spark-21-structured-streaming-databricks-laurent-weichberger

See this sample:

val query = inactive.writeStream
.format("parquet")
.option("path", "/com/infotrellis/spark")
.option("checkpointLocation", "/com/infotrellis/check")
.start()

query.awaitTermination()