Created 06-06-2017 09:59 PM
Hi
I am getting error "Queries with streaming sources must be executed with writeStream.start();" while running the code shown below.
Any help will be greatly appreciated.
package ca.twitter2 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import kafka.serializer.StringDecoder import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.log4j._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer //import org.apache.spark.streaming.kafka010._ //import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent //import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe //import kafka.serializer.StringDecoder import java.util.HashMap import org.apache.spark.SparkConf import org.apache.spark._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.sql._ import org.apache.spark.sql.kafka010 //import com.datastax.spark.connector._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.{explode, split} object kafkatest3 { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("kafkatest3") .master("local[*]") .getOrCreate() val topics = Array("twitter") val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "siaihdf1a.coc.ca:6667") .option("subscribe", "twitter") .option("startingOffsets", "earliest") .load() import spark.implicits._ val df = ds1.selectExpr("CAST(key AS STRING)", "CAST( value AS STRING)").as[(String, String)] ds1.printSchema() df.createOrReplaceTempView("df"); val records = spark.sql ("SELECT count(*) from df GROUP BY key") records.show() val query = records.writeStream.outputMode("complete").format("console").start() query.awaitTermination() spark.stop() } }
Thanks,
CHandra
Created 06-07-2017 06:34 PM
Could you please add the complete error trace?
Created 06-09-2017 10:57 PM
I wrote about this in my Spark Structured Streaming blog here: https://www.linkedin.com/pulse/spark-21-structured-streaming-databricks-laurent-weichberger
See this sample:
val query = inactive.writeStream .format("parquet") .option("path", "/com/infotrellis/spark") .option("checkpointLocation", "/com/infotrellis/check") .start() query.awaitTermination()