Support Questions

Find answers, ask questions, and share your expertise

Spark Structured Streaming - Integration with File Sink and Kafka

Explorer

Hello Friends,

We have a upcoming project and for that I am learning Spark Streaming (with focus on Structured Streaming).

So far I have completed few simple case studies from online. But I am stuck with 2 scenarios and they are described below.

I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.

1. Writing Streaming Aggregation to File

# spark-submit --master local[*] /home/training/santanu/ssfs_2.py

from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()

source = "file:///home/training/santanu/logs" 
tgt = "file:///home/training/santanu/hdfs" 
chk = "file:///home/training/santanu/checkpoint"

schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])

df1 = spark.readStream.csv(source,schema=schema1,sep=",") 
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() 
df3 = df2.select("agent","count").withColumnRenamed("count","group_count")

query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start()   # Error

query.awaitTermination() 
spark.stop()  

Error I am getting :

# Append output mode not supported when there are streaming aggregations on DataFrames without watermark;

2. Reading from Kafka (Consumer) using Streaming

# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()

print(type(df))       # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema()      # printing schema hierarchy

query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()   # Error

query.awaitTermination()
spark.stop() 

Error I am getting :

# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

Please Help.

Thanking you

Santanu Ghosh

2 REPLIES 2

Explorer

Friends, any update for these 2 questions. Sadly after so many days still no reply.

Regards

New Contributor

1. you should use one column of your data frame that contain timestamp (or create a new one) and then mark that column as watermark, using .withWaterMark()

2. change

.option("subscribe","MyTopic_1")

to

.option("topic","MyTopic_1")

and you good to go

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.