Created 12-21-2018 07:28 AM
Hello Friends,
We have a upcoming project and for that I am learning Spark Streaming (with focus on Structured Streaming).
So far I have completed few simple case studies from online. But I am stuck with 2 scenarios and they are described below.
I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.
1. Writing Streaming Aggregation to File
# spark-submit --master local[*] /home/training/santanu/ssfs_2.py from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField,StringType,IntegerType spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate() source = "file:///home/training/santanu/logs" tgt = "file:///home/training/santanu/hdfs" chk = "file:///home/training/santanu/checkpoint" schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())]) df1 = spark.readStream.csv(source,schema=schema1,sep=",") df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() df3 = df2.select("agent","count").withColumnRenamed("count","group_count") query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start() # Error query.awaitTermination() spark.stop()
Error I am getting :
# Append output mode not supported when there are streaming aggregations on DataFrames without watermark;
2. Reading from Kafka (Consumer) using Streaming
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate() df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load() print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'> df.printSchema() # printing schema hierarchy query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start() # Error query.awaitTermination() spark.stop()
Error I am getting :
# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)
Please Help.
Thanking you
Santanu Ghosh
Created 01-06-2019 01:05 PM
Friends, any update for these 2 questions. Sadly after so many days still no reply.
Regards
Created 06-06-2019 05:53 AM
1. you should use one column of your data frame that contain timestamp (or create a new one) and then mark that column as watermark, using .withWaterMark()
2. change
.option("subscribe","MyTopic_1")
to
.option("topic","MyTopic_1")
and you good to go