New Contributor
Posts: 6
Registered: ‎09-11-2018

Spark Structured Streaming - File Sink and Kafka

[ Edited ]

Hello Friends,


We have a upcoming project and for that I am learning Spark Streaming (with focus on pyspark).


So far I have completed few simple case studies from online.

But I am stuck with 2 scenarios and they are described below.


I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.


1. Writing Streaming Aggregation to File


# spark-submit --master local[*] /home/training/santanu/
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()
source = "file:///home/training/santanu/logs" 
tgt = "file:///home/training/santanu/hdfs" 
chk = "file:///home/training/santanu/checkpoint"
schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])
df1 = spark.readStream.csv(source,schema=schema1,sep=",") 
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() 
df3 ="agent","count").withColumnRenamed("count","group_count")
query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start()   # Error


Error message :

# AnalysisException: Append output mode not supported when there are streaming aggregations on DataFrames without watermark;


2. Reading from Kafka (Consumer) using Streaming


# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()
print(type(df))       # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema()      # printing schema hierarchy
query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()   # Error


Error message :

# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)


Please Help.


Thanking you


Santanu Ghosh







New Contributor
Posts: 6
Registered: ‎09-11-2018

Re: Spark Structured Streaming - File Sink and Kafka

Friends, any update for these 2 questions, sadly after many days still no reply.