Created on 04-24-2023 03:59 AM - edited 07-26-2023 05:32 AM
Spark Streaming jobs are long-running jobs and their tasks need to be executed 7*24 hours, but there are cases like upgrading the code or updating the Spark configuration, where the program needs to be actively stopped. However, for distributed programs, there is no way to kill processes one by one. It is very important to shut down all configurations gracefully.
There are two ways to shut down streaming applications:
A non-graceful shutdown is a violent shutdown process. Stopping Spark streaming violently may cause problems. For example, if your data source is Kafka, and a batch of data has been loaded into Spark streaming for processing. If it is forced to stop, consumption will be repeated or some data will be lost at the next startup.
Disadvantages:
Non-graceful shutdown in two ways:
The above command(s) will stop the streaming application but this could happen in the middle of the batch. Due to this, there is a possibility of data loss or data duplication.
To solve this, we need to implement a Graceful Shutdown process to ensure that the Spark Streaming Application will only shut down between micro-batches, so we don’t lose any data.
A Graceful shutdown means completing the current data processing, stopping receiving new data, and terminating the application. It will ensure that Spark jobs shut down in a recoverable state and without any data loss or duplication.
A graceful shutdown can improve application reliability since it guarantees the execution of pending tasks and reduces data loss that could be produced by the immediate context stop.
Advantages of graceful shutdown:
There are several advantages of using graceful shutdown in Spark Streaming:
Prevent data loss: When a Spark Streaming application is shut down abruptly, it can lead to data loss as the application may not have processed all of the data in the current batch. Graceful shutdown ensures that the application completes the processing of the current batch before shutting down, reducing the risk of data loss.
Reduce errors: Abrupt shutdown of a Spark Streaming application can result in errors such as lost partitions, incomplete batch processing, and other errors. Graceful shutdown ensures that the application stops processing new data in a controlled way, allowing it to complete its current processing and reduce the likelihood of errors.
Better resource management: Graceful shutdown ensures that the Spark Streaming application releases all of its resources in a controlled manner, preventing resource leaks and allowing for better resource management.
Improved performance: Graceful shutdown ensures that the Spark Streaming application completes its current batch processing before shutting down. This can lead to improved performance as the application has more time to optimize and complete its processing.
Easier recovery: In case of errors or failures, a gracefully shut down Spark Streaming application can be recovered more easily than one that was abruptly shut down. The application can be restarted and continue processing from the last successfully processed batch, reducing the amount of data that needs to be reprocessed.
In Spark Streaming, there are a few steps to follow for a graceful shutdown:
Stop receiving data: The first step is to stop receiving data from the input sources. This can be done by calling the stop() method on the StreamingContext object.
Wait for pending batches to complete: Once you have stopped receiving data, you should wait for all the pending batches to complete processing. You can use the awaitTerminationOrTimeout() method on the StreamingContext object to wait for a specified amount of time for the pending batches to complete.
Stop the streaming context: If all the pending batches have completed processing within the specified timeout period, you can stop the StreamingContext object by calling the stop() method with the gracefulStop() flag set to true. This will ensure that all the output sinks are flushed before the context is stopped.
Handle any exceptions: If there are any exceptions or errors during the shutdown process, you should handle them appropriately. You can use try-catch blocks to handle any exceptions that may occur.
There are several ways to gracefully shutdown a Spark Streaming application:
Explicitly calling the JVM Shutdown Hook in the driver program (Not Recommended)
Using spark.streaming.stopGracefullyOnShutdown = true (Okay)
Use an external approach to control internal program shutdown (Recommended)
Expose the interface to the outside world and provide the shutdown function (Recommended)
In this article, we will discuss the first two methods.
1. Explicitly calling the JVM Shutdown Hook in the driver programThe easiest way to shut down the Spark Streaming application gracefully is by calling the streamingContext stop() method inside a JVM Shutdown Hook. A shutdown hook is a thread that is registered with the JVM to run when the JVM is about to shut down. JVM will shut down. For example, all non-daemon threads exited, System.exit was called or CTRL+C was typed.
Using Java
Use Runtime.getRuntime().addShutdownHook() to register the shutdown hook method, and call the stop method of spark before the JVM exits to perform an elegant shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Gracefully stopping Spark Streaming Application.");
streamingContext.stop( true, true);
logger.info("The Spark Streaming Application has been successfully stopped.");
}));
This can ensure that before the process is killed and before the driver ends, ssc.stop will be called to ensure that the data is processed.
Using Scala
sys.addShutdownHook {
logger.info("Gracefully stopping Spark Streaming Application.")
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
logger.info("The Spark Streaming Application has been successfully stopped.")
}
Using Python
import signal
import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
def stop_gracefully(ssc):
print("Gracefully stopping Spark Streaming Application.")
ssc.stop(stopSparkContext=True, stopGracefully=True)
print("The Spark Streaming Application has been successfully stopped.")
sys.exit(0)
if __name__ == '__main__':
# Create Spark Config
sparkConf = SparkConf().setAppName("SparkStreamingGracefulShutdownHookApp")
# Create Spark Streaming context
ssc = StreamingContext(sparkConf, 30 * 1000)
# Register the shutdown hook
signal.signal(signal.SIGTERM, lambda sig, frame: stop_gracefully(ssc))
signal.signal(signal.SIGINT, lambda sig, frame: stop_gracefully(ssc))
# Start processing data
ssc.start()
ssc.awaitTermination()
Note: This approach is recommended only for Spark version < 1.4.
Drawbacks:
There are some drawbacks to the current approach:
2. Using spark.streaming.stopGracefullyOnShutdown = true
Solution1 will have too much trouble adding such repetitive codes in each program.
From Spark version 1.4 onwards, Spark has builtin spark.streaming.stopGracefullyOnShutdown parameters to decide whether to close the Streaming program in a graceful way (see SPARK-7776 for details). By default spark.streaming.stopGracefullyOnShutdown parameter value is false.
When spark.streaming.stopGracefullyOnShutdown is set to true, the Spark Streaming application will be stopped gracefully when a shutdown signal is received. This means that the application will complete its current batches and clean up resources before shutting down.
To use the spark.streaming.stopGracefullyOnShutdown parameter, it must be set in the Spark configuration either via command-line arguments or a properties file or programmatically using the SparkConf object.
spark-submit:
spark-submit --conf spark.streaming.stopGracefullyOnShutdown=true ...
Scala:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
val sparkConf = new SparkConf().setAppName("SparkStreamingGracefulShutdownSignalApp").set("spark.streaming.stopGracefullyOnShutdown", "true")
val sc = new SparkContext(sparkConf)
Python:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
sparkConf = SparkConf().setAppName("SparkStreamingGracefulShutdownSignalApp").set("spark.streaming.stopGracefullyOnShutdown", "true")
sc = SparkContext(conf=sparkConf)
By using this approach, we should not use the first explicit shutdown hook approach or call the ssc.stop() method in the driver along with this parameter. We can just set this parameter and then call methods ssc.start() and ssc.awaitTermination(). No need to call ssc.stop() method. Otherwise, the application might hang during the shutdown.
Matters needing attention:
By default, the spark.yarn.maxAppAttempts parameter uses the default value from yarn.resourcemanager.am.max-attempts value in Yarn. The default value is 2. So, after the kill command stops the first AM, YARN will automatically start another AM/driver. You have to kill the second one. You can set --conf spark.yarn.maxAppAttempts=1 during the spark submission process, but if this parameter is set to 1, there will be a risk that AM will fail and not retry, and the disaster recovery effect will become worse.
This method is not recommended to kill the application using the Yarn application -kill <applicationid> command. This command will send a SIGTERM signal to the container, but then a SIGTERM signal will be sent. This time interval is yarn.nodemanager.sleep-delay-before-sigkill.ms determined by (default 250ms). If the value is set to 60000ms, it still cannot work normally. The log only has these two lines, as follows:
23/03/21 17:03:36 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
23/03/21 17:03:36 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
23/03/21 17:03:36 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 receivers
23/03/21 17:03:36 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
The spark.streaming.stopGracefullyOnShutdown parameter will depend on this parameter: spark.streaming.gracefulStopTimeout (unit ms), the default is 10 times the interval between batches. When the processing is not completed after this time, it is forced to stop.
Drawbacks:
There are a few drawbacks to this approach:
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
// nc -lk 9999
object SparkStreamingGracefulShutdownSignalApp extends App with Serializable {
private val appName = getClass.getSimpleName.replace("$", "") // App Name
@transient private lazy val logger: Logger = Logger.getLogger(appName)
private val checkpointDirectory = s"/tmp/streaming/$appName/checkpoint" // Checkpoint Directory
private val batchInterval: Long = 30 * 1000 // 30 seconds batch interval
if (args.length < 2) {
logger.error(s"Usage\t: $appName <hostname> <port>")
logger.info(s"Example\t: $appName localhost 9999")
System.exit(1)
}
private def createContext(hostname: String, port: Int): StreamingContext = {
// Creating the SparkConf object
val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
sparkConf.set("spark.yarn.maxAppAttempts", "1")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
//sparkConf.set("spark.streaming.gracefulStopTimeout", (10 * batchInterval).toString)
// Creating the StreamingContext object
logger.info(s"Creating StreamingContext with duration $batchInterval milli seconds ...")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval))
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
logger.info("StreamingContext created successfully ...")
// Create a socket stream on target hostname:port
val lines = ssc.socketTextStream(hostname, port)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc
}
// Get StreamingContext from checkpoint data or create a new one
private val Array(hostname, port) = args
logger.info(s"Hostname $hostname and Port $port ...")
private val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(hostname, port.toInt))
ssc.start()
logger.info("StreamingContext Started ...")
//Waiting for task termination
ssc.awaitTermination()
}
Spark Submit Command
cat run_spark_streaming_graceful_shutdown_app.sh
#!/bin/bash
echo "Running <$0> script"
HOST_NAME=$(hostname -f)
PORT=9999
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 2 \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=false \
--name SparkStreamingGracefulShutdownSignalApp \
--class SparkStreamingGracefulShutdownSignalApp \
/tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar "$HOST_NAME" "$PORT"
echo "Finished <$0> script"
How to pass the shutdown signal:
To shut down the Spark Streaming application, we need to pass the shutdown signal explicitly. (Note: Do not use -9 to force it to close; otherwise, the hook cannot capture.)
There are multiple ways we can pass the shutdown signal:
Use Ctrl + C command at the terminal (It will work only in client mode).
Find the driver process and kill that process by sending the SIGTERM(15) signal.
Go to Spark UI > Go to Executors tab > Find Driver host
Log in to the above mentioned driver machine address and run the following command to find out the PID process.
ps -ef | grep java | grep ApplicationMaster | grep <applicationId>
Note: You need to replace your applicationId in the above command.
For Example:
ps -ef | grep java | grep ApplicationMaster | grep application_1679416741274_0004
Run the following command to kill the process.
ps -ef | grep java | grep ApplicationMaster | grep <applicationId> | awk '{print $2}' | xargs kill -SIGTERM
Note: We can use either SIGTERM or 15 while killing the process.
For Example:
ps -ef | grep java | grep ApplicationMaster | grep application_1679416741274_0004 | awk '{print $2}' | xargs kill -SIGTERM
In Part2, remaining approaches to shut down the Spark streaming application will be covered.
Created on 05-04-2023 01:59 AM
Thank you, waiting for part 2
Created on 05-04-2023 02:59 AM - edited 05-04-2023 03:43 AM
these considerations are the same for DStream and Structured Streaming?
Created on 05-08-2023 03:22 AM
When are you planning to publish part 2?
Created on 05-08-2023 08:19 PM
Hi @davidebelvedere For Structured Streaming we need to implement in different way i.e using StreamingQueryListener.
Part2 i will try to publish in May end.
Created on 05-10-2023 07:31 AM
We are migrating to CDP and we have some streaming jobs using Dstreams API.
Source is Kafka Topic with 3 partitions and Sink is Hbase. and offset commit is in Kafka - using the Kafka Integration
As per below documentation
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
We do this after the actual data ingestion completion to the Hbase table
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
Kafka configs -
"security.protocol": "SASL_SSL",
"sasl.kerberos.service.name": "kafka",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG": "StringDeserializer.class"
"auto.offset.reset": "earliest",
"group.id": "STREAMING_JOB_GROUPID_101",
"enable.auto.commit": "false"
streaming configurations
"spark": {
"sparkConf": {
"spark.app.name": "STREAMING_JOB_APPLICATION_NAME",
"spark.streaming.stopGracefullyOnShutdown": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.streaming.kafka.maxRatePerPartition": "12"
}
"streamingContext": {
"batchMilliSeconds": 10000
}
Also we store the micro batch offset information () of every micro-batch in a table in Hbase - as log information
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
How ever -- our observation is when we kill the application using "yarn application -kill <application id>
The committed offsets in the Kafka - consumer group is -- ahead of of the actual offsets that were processed in the last successful batch ingestion.
So when we restart the job - it is starting from the offsets that were committed (ahead of actual data ingested to Hbase) - This is causing data loss for us
Sample
Kafka Offsets stored
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC 2 3035673 3089643 53970 - - -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC 1 3035678 3089648 53970 - - -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC 0 3035682 3089651 53969 -
Actual offset - successful ingested batch
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 2, range: [3035553 -> 3035673])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 0, range: [3035562 -> 3035682])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 1, range: [3035558 -> 3035678])
Could you please elaborate on this behavior
Created on 05-10-2023 07:39 AM
By the way - when we change the configuration to
"spark.streaming.stopGracefullyOnShutdown": "false",
And we kill the job,
We see that the commits are in Sync in both Kafka consume group and our ingested offset information.
Created on 05-14-2023 09:38 PM
Hi @ravitinku18
I will suggest to use 3rd and 4th approach. I will publish that article shortly.
Created on 05-15-2023 05:38 AM
Thanks @RangaReddy, do you have a guide to reccomend that explains how to do it with structured streaming? I can't find much on internet
Created on 05-29-2023 10:32 PM
@RangaReddyThanks for the article. When will be publishing part 2. Really eager to know 3rd and 4th approach. currently trying to run a spark streaming application in production. It will be really helpful for the project.
Created on 07-26-2023 05:28 AM