Support Questions

Find answers, ask questions, and share your expertise

Spark Structured Streaming - File Sink and Kafka

Explorer

Hello Friends,

 

We have a upcoming project and for that I am learning Spark Streaming (with focus on pyspark).

 

So far I have completed few simple case studies from online.

But I am stuck with 2 scenarios and they are described below.

 

I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.

 

1. Writing Streaming Aggregation to File

 

# spark-submit --master local[*] /home/training/santanu/ssfs_2.py
 
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
 
spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()
 
source = "file:///home/training/santanu/logs" 
tgt = "file:///home/training/santanu/hdfs" 
chk = "file:///home/training/santanu/checkpoint"
 
schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])
 
df1 = spark.readStream.csv(source,schema=schema1,sep=",") 
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() 
df3 = df2.select("agent","count").withColumnRenamed("count","group_count")
 
query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start()   # Error
 
query.awaitTermination() 
spark.stop()

 

Error message :

# AnalysisException: Append output mode not supported when there are streaming aggregations on DataFrames without watermark;

 

2. Reading from Kafka (Consumer) using Streaming

 

# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py
 
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()
 
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()
 
print(type(df))       # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema()      # printing schema hierarchy
 
query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()   # Error
 
query.awaitTermination()
spark.stop() 

 

Error message :

# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

 

Please Help.

 

Thanking you

 

Santanu Ghosh

 

 

 

 

 

 

2 REPLIES 2

Explorer

Friends, any update for these 2 questions, sadly after many days still no reply.

 

Regards

Contributor

Hello,

 

1. Writing Streaming Aggregation to File

 

In order to use append mode with aggregations, you need to set an event time watermark (using "withWatermark"). Otherwise, Spark doesn't know when to output an aggregation result as "final".

 

A watermark is a threshold to specify how long the system waits for late events.

 

For example:

df2 = df1.filter("code > 300").select("agent").withWatermark("timestamp", "2 minutes").groupBy("agent").count()

 

2. Reading from Kafka (Consumer) using Streaming

 

You have to set SPARK_KAFKA_VERSION environment variable.

 

When running jobs that require the new Kafka integration, set SPARK_KAFKA_VERSION=0.10 in the shell before launching spark-submit.

 

# Set the environment variable for the duration of your shell session:


export SPARK_KAFKA_VERSION=0.10
spark-submit arguments

 

https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html