Support Questions

Find answers, ask questions, and share your expertise

What is expected behavior of spark.streaming.backpressure.pid.minRate property

avatar
New Contributor

I have spark streaming application that reads messages from Kafka using Spark Direct Streaming (not receiver) approach and process messages per partition in yarn cluster mode.

 

In my Kafka partition, sometime we get the messages that take 20 seconds to process 2000 messages and

some of the messages takes 7-9 seconds for same no. of messages.

 

Given the fluctuation, we turned on the back pressure settings as follows.

 

spark.batch.duration=10 seconds
spark.streaming.kafka.maxRatePerPartition=200

spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=60
spark.streaming.kafka.maxRatePerPartition=200
spark.streaming.backpressure.pid.minRate=1600

and also specified RateEstimator with following parameters. I don't understand the mathematics of PID but tried different combination and one of them as follows.

 

spark.streaming.backpressure.rateEstimator=pid
spark.streaming.backpressure.pid.minRate=1600
spark.streaming.backpressure.pid.integral=1
spark.streaming.backpressure.pid.proportional=25
spark.streaming.backpressure.pid.derived=1

Initially, spark reads the 2000 messages for 1 partition in RDD but after some time it start reading 800 records. that i think is minRate/2. and then it stays static.. In the logs, it always print 1600 as new rate.

2017-01-20 14:55:14 TRACE PIDRateEstimator:67 - New rate = 1600.0

Given my scenario, i have few questions:

  1. spark.streaming.backpressure.pid.minRate is per partition or total no. of messages to be read by batch?
  2. Why reading 800 messages instead 1600 as defined in spark.streaming.backpressure.pid.minRate ??
  3. Any suggested parameters that reduce the input rate when processing takes long and increase back to something close to maxRatePerPartition when processing is very fast? In my example, input rate started with 2000 but when it took long like 20 seconds average, it reduced it to 800 but when 800 messages processed in 3-4 seconds it didn't increase it back to something 1600 or more. This results waste of time and low throughput.

6I3N5AonG0

2 REPLIES 2

avatar
Contributor

Hi,

 

I am facing the same problem. Do you find a solution to your problem ?

 

Best,

Helmi Khalifa

avatar
New Contributor

I tried with spark.streaming.backpressure.pid.minRate which work as expected.

My configuration:

 

 

 

  spark.shuffle.service.enabled: "true"
  spark.streaming.kafka.maxRatePerPartition: "600"
  spark.streaming.backpressure.enabled: "true"
  spark.streaming.concurrentJobs: "1"
  spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC"
  batch.duration: 5
  spark.streaming.backpressure.pid.minRate: 2000

 

 

 

so, by configuration it starts with = 15 (total Number of partitions) X 600 (maxRatePerPartition) X 5 (batch Duration) = 45000 but it doesn't able to process these many records in 5 seconds. It drops to ~10,000  = 2000 (pid.minRate) X 5 (batch duration)

 

Screenshot 2020-02-24 at 11.21.30 PM 3.png

 

So, it spark.streaming.backpressure.pid.minRate is total records per seconds.

 

Just set spark.streaming.backpressure.pid.minRate 

and leave following config as default

spark.streaming.backpressure.pid.integral
spark.streaming.backpressure.pid.proportional
spark.streaming.backpressure.pid.derived