Created on 12-21-2018 07:57 PM - edited 09-16-2022 07:00 AM
Hello Friends,
We have a upcoming project and for that I am learning Spark Streaming (with focus on pyspark).
So far I have completed few simple case studies from online.
But I am stuck with 2 scenarios and they are described below.
I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution.
1. Writing Streaming Aggregation to File
# spark-submit --master local[*] /home/training/santanu/ssfs_2.py from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField,StringType,IntegerType spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate() source = "file:///home/training/santanu/logs" tgt = "file:///home/training/santanu/hdfs" chk = "file:///home/training/santanu/checkpoint" schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())]) df1 = spark.readStream.csv(source,schema=schema1,sep=",") df2 = df1.filter("code > 300").select("agent").groupBy("agent").count() df3 = df2.select("agent","count").withColumnRenamed("count","group_count") query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start() # Error query.awaitTermination() spark.stop()
Error message :
# AnalysisException: Append output mode not supported when there are streaming aggregations on DataFrames without watermark;
2. Reading from Kafka (Consumer) using Streaming
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate() df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load() print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'> df.printSchema() # printing schema hierarchy query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start() # Error query.awaitTermination() spark.stop()
Error message :
# NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)
Please Help.
Thanking you
Santanu Ghosh
Created 01-06-2019 05:07 AM
Friends, any update for these 2 questions, sadly after many days still no reply.
Regards
Created on 02-05-2019 08:28 PM - edited 02-05-2019 08:30 PM
Hello,
1. Writing Streaming Aggregation to File
In order to use append mode with aggregations, you need to set an event time watermark (using "withWatermark"). Otherwise, Spark doesn't know when to output an aggregation result as "final".
A watermark is a threshold to specify how long the system waits for late events.
For example:
df2 = df1.filter("code > 300").select("agent").withWatermark("timestamp", "2 minutes").groupBy("agent").count()
2. Reading from Kafka (Consumer) using Streaming
You have to set SPARK_KAFKA_VERSION environment variable.
When running jobs that require the new Kafka integration, set SPARK_KAFKA_VERSION=0.10 in the shell before launching spark-submit.
# Set the environment variable for the duration of your shell session:
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html