Support Questions
Find answers, ask questions, and share your expertise
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

Spark Structured Streaming - File Sink and Kafka


Hello Friends,


We have a upcoming project and for that I am learning Spark Streaming (with focus on pyspark).


So far I have completed few simple case studies from online.

But I am stuck with 2 scenarios and they are described below.


I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.


1. Writing Streaming Aggregation to File


# spark-submit --master local[*] /home/training/santanu/
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()
source = "file:///home/training/santanu/logs" 
tgt = "file:///home/training/santanu/hdfs" 
chk = "file:///home/training/santanu/checkpoint"
schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])
df1 = spark.readStream.csv(source,schema=schema1,sep=",") 
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() 
df3 ="agent","count").withColumnRenamed("count","group_count")
query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start()   # Error


Error message :

# AnalysisException: Append output mode not supported when there are streaming aggregations on DataFrames without watermark;


2. Reading from Kafka (Consumer) using Streaming


# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()
print(type(df))       # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema()      # printing schema hierarchy
query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()   # Error


Error message :

# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)


Please Help.


Thanking you


Santanu Ghosh









Friends, any update for these 2 questions, sadly after many days still no reply.






1. Writing Streaming Aggregation to File


In order to use append mode with aggregations, you need to set an event time watermark (using "withWatermark"). Otherwise, Spark doesn't know when to output an aggregation result as "final".


A watermark is a threshold to specify how long the system waits for late events.


For example:

df2 = df1.filter("code > 300").select("agent").withWatermark("timestamp", "2 minutes").groupBy("agent").count()


2. Reading from Kafka (Consumer) using Streaming


You have to set SPARK_KAFKA_VERSION environment variable.


When running jobs that require the new Kafka integration, set SPARK_KAFKA_VERSION=0.10 in the shell before launching spark-submit.


# Set the environment variable for the duration of your shell session:

spark-submit arguments