Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Collaborator

Spark Streaming Graceful shutdown

Spark Streaming jobs are long-running jobs and their tasks need to be executed 7*24 hours, but there are cases like upgrading the code or updating the Spark configuration, where the program needs to be actively stopped. However, for distributed programs, there is no way to kill processes one by one. It is very important to shut down all configurations gracefully.

There are two ways to shut down streaming applications:

  1. Non-graceful shutdown
  2. Graceful shutdown

1. Non-graceful shutdown

A non-graceful shutdown is a violent shutdown process. Stopping Spark streaming violently may cause problems. For example, if your data source is Kafka, and a batch of data has been loaded into Spark streaming for processing. If it is forced to stop, consumption will be repeated or some data will be lost at the next startup.

 

Disadvantages:

  • Spark Streaming works based on the micro-batch mechanism, RDD is generated according to the interval time. If a violent shutdown is executed during the interval, it may cause data loss during this period or double calculation (assuming that the consumption data from Kafka has not been calculated yet, it is killed at this time, and double calculation or data loss may occur after the next program startup). Although a checkpoint mechanism is provided, it can be executed when the program starts recovery, but when there is a scene where the program changes, it must be deleted checkpoint, so there is a risk of loss.

Non-graceful shutdown in two ways:

  • kill -9 PID
  • yarn application -kill applicationId

The above command(s) will stop the streaming application but this could happen in the middle of the batch. Due to this, there is a possibility of data loss or data duplication.

 

To solve this, we need to implement a Graceful Shutdown process to ensure that the Spark Streaming Application will only shut down between micro-batches, so we don’t lose any data.

2. Graceful shutdown

A Graceful shutdown means completing the current data processing, stopping receiving new data, and terminating the application. It will ensure that Spark jobs shut down in a recoverable state and without any data loss or duplication.

 

A graceful shutdown can improve application reliability since it guarantees the execution of pending tasks and reduces data loss that could be produced by the immediate context stop.

 

Advantages of graceful shutdown:

There are several advantages of using graceful shutdown in Spark Streaming:

  1. Prevent data loss: When a Spark Streaming application is shut down abruptly, it can lead to data loss as the application may not have processed all of the data in the current batch. Graceful shutdown ensures that the application completes the processing of the current batch before shutting down, reducing the risk of data loss.

  2. Reduce errors: Abrupt shutdown of a Spark Streaming application can result in errors such as lost partitions, incomplete batch processing, and other errors. Graceful shutdown ensures that the application stops processing new data in a controlled way, allowing it to complete its current processing and reduce the likelihood of errors.

  3. Better resource management: Graceful shutdown ensures that the Spark Streaming application releases all of its resources in a controlled manner, preventing resource leaks and allowing for better resource management.

  4. Improved performance: Graceful shutdown ensures that the Spark Streaming application completes its current batch processing before shutting down. This can lead to improved performance as the application has more time to optimize and complete its processing.

  5. Easier recovery: In case of errors or failures, a gracefully shut down Spark Streaming application can be recovered more easily than one that was abruptly shut down. The application can be restarted and continue processing from the last successfully processed batch, reducing the amount of data that needs to be reprocessed.

In Spark Streaming, there are a few steps to follow for a graceful shutdown:

  1. Stop receiving data: The first step is to stop receiving data from the input sources. This can be done by calling the stop() method on the StreamingContext object.

  2. Wait for pending batches to complete: Once you have stopped receiving data, you should wait for all the pending batches to complete processing. You can use the awaitTerminationOrTimeout() method on the StreamingContext object to wait for a specified amount of time for the pending batches to complete.

  3. Stop the streaming context: If all the pending batches have completed processing within the specified timeout period, you can stop the StreamingContext object by calling the stop() method with the gracefulStop() flag set to true. This will ensure that all the output sinks are flushed before the context is stopped.

  4. Handle any exceptions: If there are any exceptions or errors during the shutdown process, you should handle them appropriately. You can use try-catch blocks to handle any exceptions that may occur.

Different ways to shutdown/stop the Spark Streaming Application gracefully

There are several ways to gracefully shutdown a Spark Streaming application:

  1. Explicitly calling the JVM Shutdown Hook in the driver program (Not Recommended)

  2. Using spark.streaming.stopGracefullyOnShutdown = true (Okay)

  3. Use an external approach to control internal program shutdown (Recommended)

  4. Expose the interface to the outside world and provide the shutdown function (Recommended)

In this article, we will discuss the first two methods. 

1. Explicitly calling the JVM Shutdown Hook in the driver program

The easiest way to shut down the Spark Streaming application gracefully is by calling the streamingContext stop() method inside a JVM Shutdown Hook. A shutdown hook is a thread that is registered with the JVM to run when the JVM is about to shut down. JVM will shut down. For example, all non-daemon threads exited, System.exit was called or CTRL+C was typed.

 

Using Java

Use Runtime.getRuntime().addShutdownHook() to register the shutdown hook method, and call the stop method of spark before the JVM exits to perform an elegant shutdown.

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.info("Gracefully stopping Spark Streaming Application.");
    streamingContext.stop(  true, true);
    logger.info("The Spark Streaming Application has been successfully stopped.");
}));

This can ensure that before the process is killed and before the driver ends, ssc.stop will be called to ensure that the data is processed.

Using Scala

sys.addShutdownHook {
  logger.info("Gracefully stopping Spark Streaming Application.")
  streamingContext.stop(stopSparkContext = true, stopGracefully = true)
  logger.info("The Spark Streaming Application has been successfully stopped.")
}

Using Python

import signal
import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

def stop_gracefully(ssc):
    print("Gracefully stopping Spark Streaming Application.")
    ssc.stop(stopSparkContext=True, stopGracefully=True)
    print("The Spark Streaming Application has been successfully stopped.")
    sys.exit(0)

if __name__ == '__main__':

    # Create Spark Config
    sparkConf = SparkConf().setAppName("SparkStreamingGracefulShutdownHookApp")

    # Create Spark Streaming context
    ssc = StreamingContext(sparkConf, 30 * 1000)

    # Register the shutdown hook
    signal.signal(signal.SIGTERM, lambda sig, frame: stop_gracefully(ssc))
    signal.signal(signal.SIGINT, lambda sig, frame: stop_gracefully(ssc))

    # Start processing data
    ssc.start()
    ssc.awaitTermination()

Note: This approach is recommended only for Spark version < 1.4.

Drawbacks:

There are some drawbacks to the current approach:

  1. There is a possibility that a deadlock situation will occur.
  2. There is no guarantee that a shutdown hook will be called by the JVM at all.

2. Using spark.streaming.stopGracefullyOnShutdown = true

Solution1 will have too much trouble adding such repetitive codes in each program.

From Spark version 1.4 onwards, Spark has builtin spark.streaming.stopGracefullyOnShutdown parameters to decide whether to close the Streaming program in a graceful way (see SPARK-7776 for details). By default spark.streaming.stopGracefullyOnShutdown parameter value is false.


When spark.streaming.stopGracefullyOnShutdown is set to true, the Spark Streaming application will be stopped gracefully when a shutdown signal is received. This means that the application will complete its current batches and clean up resources before shutting down.

 

To use the spark.streaming.stopGracefullyOnShutdown parameter, it must be set in the Spark configuration either via command-line arguments or a properties file or programmatically using the SparkConf object.

 

spark-submit:

spark-submit --conf spark.streaming.stopGracefullyOnShutdown=true ...

Scala:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

val sparkConf = new SparkConf().setAppName("SparkStreamingGracefulShutdownSignalApp").set("spark.streaming.stopGracefullyOnShutdown", "true")
val sc = new SparkContext(sparkConf)

Python:

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

sparkConf = SparkConf().setAppName("SparkStreamingGracefulShutdownSignalApp").set("spark.streaming.stopGracefullyOnShutdown", "true")
sc = SparkContext(conf=sparkConf)

By using this approach, we should not use the first explicit shutdown hook approach or call the ssc.stop() method in the driver along with this parameter. We can just set this parameter and then call methods ssc.start() and ssc.awaitTermination(). No need to call ssc.stop() method. Otherwise, the application might hang during the shutdown.

 

Matters needing attention:

  1. By default, the spark.yarn.maxAppAttempts parameter uses the default value from yarn.resourcemanager.am.max-attempts value in Yarn. The default value is 2. So, after the kill command stops the first AM, YARN will automatically start another AM/driver. You have to kill the second one. You can set --conf spark.yarn.maxAppAttempts=1 during the spark submission process, but if this parameter is set to 1, there will be a risk that AM will fail and not retry, and the disaster recovery effect will become worse.

  2. This method is not recommended to kill the application using the Yarn application -kill <applicationid> command. This command will send a SIGTERM signal to the container, but then a SIGTERM signal will be sent. This time interval is yarn.nodemanager.sleep-delay-before-sigkill.ms determined by (default 250ms). If the value is set to 60000ms, it still cannot work normally. The log only has these two lines, as follows:

 

23/03/21 17:03:36 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
23/03/21 17:03:36 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
23/03/21 17:03:36 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 receivers
23/03/21 17:03:36 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver

 

  • The spark.streaming.stopGracefullyOnShutdown parameter will depend on this parameter: spark.streaming.gracefulStopTimeout (unit ms), the default is 10 times the interval between batches. When the processing is not completed after this time, it is forced to stop.

Drawbacks:

There are a few drawbacks to this approach:

  1. It can be run only on the same machine on which the driver program was run and not on any other node machine in the spark cluster.
  2. Make sure to set spark.yarn.maxAppAttempts parameter value will be 1.

Implementation

import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._

// nc -lk 9999
object SparkStreamingGracefulShutdownSignalApp extends App with Serializable {

  private val appName = getClass.getSimpleName.replace("$", "") // App Name
  @transient private lazy val logger: Logger = Logger.getLogger(appName)

  private val checkpointDirectory = s"/tmp/streaming/$appName/checkpoint" // Checkpoint Directory
  private val batchInterval: Long = 30 * 1000 // 30 seconds batch interval

  if (args.length < 2) {
    logger.error(s"Usage\t: $appName <hostname> <port>")
    logger.info(s"Example\t: $appName localhost 9999")
    System.exit(1)
  }

  private def createContext(hostname: String, port: Int): StreamingContext = {

    // Creating the SparkConf object
    val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
    sparkConf.set("spark.yarn.maxAppAttempts", "1")
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    //sparkConf.set("spark.streaming.gracefulStopTimeout", (10 * batchInterval).toString)

    // Creating the StreamingContext object
    logger.info(s"Creating StreamingContext with duration $batchInterval milli seconds ...")
    val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval))
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory
    logger.info("StreamingContext created successfully ...")

    // Create a socket stream on target hostname:port
    val lines = ssc.socketTextStream(hostname, port)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
    ssc
  }

  // Get StreamingContext from checkpoint data or create a new one
  private val Array(hostname, port) = args
  logger.info(s"Hostname $hostname and Port $port ...")

  private val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(hostname, port.toInt))
  ssc.start()
  logger.info("StreamingContext Started ...")

  //Waiting for task termination
  ssc.awaitTermination()

}

Spark Submit Command

cat run_spark_streaming_graceful_shutdown_app.sh

#!/bin/bash
echo "Running <$0> script"

HOST_NAME=$(hostname -f)
PORT=9999

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 2g \
  --num-executors 2 \
  --executor-cores 2 \
  --conf spark.dynamicAllocation.enabled=false \
  --name SparkStreamingGracefulShutdownSignalApp \
  --class SparkStreamingGracefulShutdownSignalApp \
  /tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar "$HOST_NAME" "$PORT"

echo "Finished <$0> script"

How to pass the shutdown signal:

To shut down the Spark Streaming application, we need to pass the shutdown signal explicitly. (Note: Do not use -9 to force it to close; otherwise, the hook cannot capture.)

There are multiple ways we can pass the shutdown signal:

  1. Use Ctrl + C command at the terminal (It will work only in client mode).

  2. Find the driver process and kill that process by sending the SIGTERM(15) signal.

    • Go to Spark UI > Go to Executors tab > Find Driver host

    • Log in to the above mentioned driver machine address and run the following command to find out the PID process.

      ps -ef | grep java | grep ApplicationMaster | grep <applicationId>

      Note: You need to replace your applicationId in the above command.

      For Example:

      ps -ef | grep java | grep ApplicationMaster | grep application_1679416741274_0004

    • Run the following command to kill the process.

      ps -ef | grep java | grep ApplicationMaster | grep <applicationId> | awk '{print $2}' | xargs kill -SIGTERM

      Note: We can use either SIGTERM or 15 while killing the process.

      For Example:

      ps -ef | grep java | grep ApplicationMaster | grep application_1679416741274_0004 | awk '{print $2}' | xargs kill -SIGTERM

 

In Part2, remaining approaches to shut down the Spark streaming application will be covered.

22,345 Views
Comments
avatar

Thank you, waiting for part 2 

avatar

these considerations are the same for DStream and Structured Streaming?

avatar

When are you planning to publish part 2?

avatar
Master Collaborator

Hi @davidebelvedere For Structured Streaming we need to implement in different way i.e using StreamingQueryListener.

Part2 i will try to publish in May end.

avatar
New Contributor

We are migrating to CDP and we have some streaming jobs using Dstreams API.
Source is Kafka Topic with 3 partitions and Sink is Hbase. and offset commit is in Kafka - using the Kafka Integration
As per below documentation 
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

We do this after the actual data ingestion completion to the Hbase table

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

Kafka configs -

 

"security.protocol": "SASL_SSL",
"sasl.kerberos.service.name": "kafka",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG": "StringDeserializer.class"
"auto.offset.reset": "earliest",
"group.id": "STREAMING_JOB_GROUPID_101",
"enable.auto.commit": "false"

 


streaming configurations

 

"spark": {
"sparkConf": {
"spark.app.name": "STREAMING_JOB_APPLICATION_NAME",
"spark.streaming.stopGracefullyOnShutdown": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.streaming.kafka.maxRatePerPartition": "12"

}
"streamingContext": {
"batchMilliSeconds": 10000
}

 


Also we store the micro batch offset information () of every micro-batch in a table in Hbase - as log information 

 

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

 


How ever  -- our observation is when we kill the application using "yarn application -kill <application id>
The committed offsets in the Kafka - consumer group is -- ahead of of the actual offsets that were processed in the last successful batch ingestion.
So when we restart the job - it is starting from the offsets that were committed (ahead of actual data ingested to Hbase) - This is causing data loss for us
Sample

 

Kafka Offsets stored
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC    2          3035673         3089643         53970           -               -               -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC    1          3035678         3089648         53970           -               -               -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC    0          3035682         3089651         53969           -   

Actual offset - successful ingested batch
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 2, range: [3035553 -> 3035673])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 0, range: [3035562 -> 3035682])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 1, range: [3035558 -> 3035678])

 

Could you please elaborate on this behavior

avatar
New Contributor

By the way - when we change the configuration to 

"spark.streaming.stopGracefullyOnShutdown": "false",

And we kill the job,
We see that the commits are in Sync in both Kafka consume group and our ingested offset information.

avatar
Master Collaborator

Hi @ravitinku18 

I will suggest to use 3rd and 4th approach. I will publish that article shortly.

avatar

Thanks @RangaReddy, do you have a guide to reccomend that explains how to do it with structured streaming? I can't find much on internet

avatar
New Contributor

@RangaReddyThanks for the article. When will be publishing part 2.  Really eager to know 3rd and 4th approach. currently trying to run a spark streaming application in production. It will be really helpful for the project.