Reply
New Contributor
Posts: 6
Registered: ‎09-11-2018

Spark Structured Streaming - File Sink and Kafka

[ Edited ]

Hello Friends,

 

We have a upcoming project and for that I am learning Spark Streaming (with focus on pyspark).

 

So far I have completed few simple case studies from online.

But I am stuck with 2 scenarios and they are described below.

 

I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.

 

1. Writing Streaming Aggregation to File

 

# spark-submit --master local[*] /home/training/santanu/ssfs_2.py
 
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
 
spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()
 
source = "file:///home/training/santanu/logs" 
tgt = "file:///home/training/santanu/hdfs" 
chk = "file:///home/training/santanu/checkpoint"
 
schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])
 
df1 = spark.readStream.csv(source,schema=schema1,sep=",") 
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() 
df3 = df2.select("agent","count").withColumnRenamed("count","group_count")
 
query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start()   # Error
 
query.awaitTermination() 
spark.stop()

 

Error message :

# AnalysisException: Append output mode not supported when there are streaming aggregations on DataFrames without watermark;

 

2. Reading from Kafka (Consumer) using Streaming

 

# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py
 
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()
 
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()
 
print(type(df))       # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema()      # printing schema hierarchy
 
query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()   # Error
 
query.awaitTermination()
spark.stop() 

 

Error message :

# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

 

Please Help.

 

Thanking you

 

Santanu Ghosh

 

 

 

 

 

 

Highlighted
New Contributor
Posts: 6
Registered: ‎09-11-2018

Re: Spark Structured Streaming - File Sink and Kafka

Friends, any update for these 2 questions, sadly after many days still no reply.

 

Regards

Announcements