Created on 03-03-2017 06:29 AM - edited 09-16-2022 04:11 AM
I have spark streaming application that reads messages from Kafka using Spark Direct Streaming (not receiver) approach and process messages per partition in yarn cluster mode.
In my Kafka partition, sometime we get the messages that take 20 seconds to process 2000 messages and
some of the messages takes 7-9 seconds for same no. of messages.
Given the fluctuation, we turned on the back pressure settings as follows.
spark.batch.duration=10 seconds spark.streaming.kafka.maxRatePerPartition=200 spark.streaming.backpressure.enabled=true spark.streaming.backpressure.initialRate=60 spark.streaming.kafka.maxRatePerPartition=200
spark.streaming.backpressure.pid.minRate=1600
and also specified RateEstimator with following parameters. I don't understand the mathematics of PID but tried different combination and one of them as follows.
spark.streaming.backpressure.rateEstimator=pid spark.streaming.backpressure.pid.minRate=1600 spark.streaming.backpressure.pid.integral=1 spark.streaming.backpressure.pid.proportional=25 spark.streaming.backpressure.pid.derived=1
Initially, spark reads the 2000 messages for 1 partition in RDD but after some time it start reading 800 records. that i think is minRate/2. and then it stays static.. In the logs, it always print 1600 as new rate.
2017-01-20 14:55:14 TRACE PIDRateEstimator:67 - New rate = 1600.0
Given my scenario, i have few questions:
Created 02-22-2020 11:35 AM
Hi,
I am facing the same problem. Do you find a solution to your problem ?
Best,
Helmi Khalifa
Created on 02-24-2020 06:14 PM - edited 02-24-2020 10:15 PM
I tried with spark.streaming.backpressure.pid.minRate which work as expected.
My configuration:
spark.shuffle.service.enabled: "true"
spark.streaming.kafka.maxRatePerPartition: "600"
spark.streaming.backpressure.enabled: "true"
spark.streaming.concurrentJobs: "1"
spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC"
batch.duration: 5
spark.streaming.backpressure.pid.minRate: 2000
so, by configuration it starts with = 15 (total Number of partitions) X 600 (maxRatePerPartition) X 5 (batch Duration) = 45000 but it doesn't able to process these many records in 5 seconds. It drops to ~10,000 = 2000 (pid.minRate) X 5 (batch duration)
So, it spark.streaming.backpressure.pid.minRate is total records per seconds.
Just set spark.streaming.backpressure.pid.minRate
and leave following config as default
spark.streaming.backpressure.pid.integral
spark.streaming.backpressure.pid.proportional
spark.streaming.backpressure.pid.derived