Created 06-06-2017 09:59 PM
Hi
I am getting error "Queries with streaming sources must be executed with writeStream.start();" while running the code shown below.
Any help will be greatly appreciated.
package ca.twitter2
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
//import org.apache.spark.streaming.kafka010._
//import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import kafka.serializer.StringDecoder
import java.util.HashMap
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.kafka010
//import com.datastax.spark.connector._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{explode, split}
object kafkatest3 {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("kafkatest3")
.master("local[*]")
.getOrCreate()
val topics = Array("twitter")
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "siaihdf1a.coc.ca:6667")
.option("subscribe", "twitter")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
val df = ds1.selectExpr("CAST(key AS STRING)", "CAST( value AS STRING)").as[(String, String)]
ds1.printSchema()
df.createOrReplaceTempView("df");
val records = spark.sql ("SELECT count(*) from df GROUP BY key")
records.show()
val query = records.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
spark.stop()
}
}
Thanks,
CHandra
Created 06-07-2017 06:34 PM
Could you please add the complete error trace?
Created 06-09-2017 10:57 PM
I wrote about this in my Spark Structured Streaming blog here: https://www.linkedin.com/pulse/spark-21-structured-streaming-databricks-laurent-weichberger
See this sample:
val query = inactive.writeStream
.format("parquet")
.option("path", "/com/infotrellis/spark")
.option("checkpointLocation", "/com/infotrellis/check")
.start()
query.awaitTermination()