Created on 07-19-2023 02:27 AM
In the first part of the blog post, we discussed the importance of graceful shutdown in Spark Streaming applications.
We also saw different ways to shutdown a Spark Streaming application gracefully, including:
In the second part, we will discuss a better way to gracefully shutdown the streaming application using other approaches.
This approach allows an external process to control the shutdown of the internal program by sending a marker record, which is detected by the internal program. The internal program can then initiate a graceful shutdown procedure, ensuring that any unsaved data is saved and any resources are properly released before exiting.
Benefits:
Drawbacks:
2.1 Different Implementations:
There are different ways to implement Spark Streaming shutdown gracefully using this approach.
Each of these approaches has its advantages and disadvantages, and the best approach depends on the specific requirements of the use case.
2.2 Using a File System:
We can use any file system to create a marker file. In this example, I used the HDFS file system to create the marker file.
Here are the steps to achieve graceful shutdown using the Marker Approach with HDFS:
Implementation:
SparkStreamingKafkaGracefulShutdownMarkerApp.scala
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkStreamingKafkaGracefulShutdownMarkerApp extends App with Serializable {
// Create a logger instance for logging messages
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
// Define AppName
private val appName = getClass.getSimpleName.replace("$", "")
if (args.length < 4) {
logger.error(s"Usage\t: $appName <bootstrapServers> <groupId> <topics> <markerFile>")
logger.info(s"Example\t: $appName <localhost:9092> <my_group> <test_topic> </tmp/graceful_shutdown>")
System.exit(1)
}
// Consume command line arguments
private val Array(bootstrapServers, groupId, topic, markerFile) = args
// Create a SparkConf object
val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
// Create StreamingContext, with batch duration in seconds
private val duration = if (args.length > 4) args(4).toInt else 30
logger.info(s"Creating StreamingContext with batch duration $duration seconds...")
private val batchDuration = Seconds(duration)
val streamingContext = new StreamingContext(sparkConf, batchDuration)
logger.info("StreamingContext created successfully ...")
// Define Kafka Parameters
private val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// Create a set of topics from a string
private val topics: Set[String] = topic.split(",").map(_.trim).toSet
// Create a streaming context from Kafka
private val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Get the lines of text from Kafka
val lines = stream.map(record => record.value())
// Split lines into words
val words = lines.flatMap(_.split(" "))
// Map every word to a tuple
private val wordMap = words.map(word => (word, 1))
// Count occurrences of each word
val wordCounts = wordMap.reduceByKey(_ + _)
// Print the word count
wordCounts.print()
// Start stream processing
streamingContext.start()
logger.info("StreamingContext Started ...")
// Gracefully shutdown the Spark Streaming application when the termination marker is received
StopByMarkerFileSystem.stopByMarkerFile(streamingContext, markerFile)
// Wait for the computation to terminate
streamingContext.awaitTermination()
}
StopByMarkerFileSystem.scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.Logger
import org.apache.spark.streaming.StreamingContext
object StopByMarkerFileSystem {
// Create a logger instance for logging messages
@transient private lazy val logger: Logger = Logger.getLogger(getClass.getSimpleName.replace("$", ""))
/**
* Stops the Spark Streaming context gracefully based on the presence of a marker file.
*
* @param ssc The StreamingContext to stop
* @param markerFile The path of the marker file indicating when to stop
*/
def stopByMarkerFile(ssc: StreamingContext, markerFile: String): Unit = {
var isStop = false
while (!isStop) {
if (!isStop && checkMarkerFileExists(markerFile)) {
logger.info("Marker file exists, stopping the Spark Streaming Context gracefully ...")
// Stop the streaming context if it has not already terminated
ssc.stop(stopSparkContext = true, stopGracefully = true)
// Delete the marker file
deleteMarkerFile(markerFile)
logger.info("Spark Streaming Context is Stopped ...")
isStop = true
}
}
}
/**
* Checks if a marker file exists in the file system.
*
* @param markerFile The path of the marker file
* @return True if the marker file exists, false otherwise
*/
private def checkMarkerFileExists(markerFile: String): Boolean = {
// Create a new Path object with the provided marker file path
val path = new Path(markerFile)
// Get the FileSystem associated with the path
val fs = path.getFileSystem(new Configuration())
// Check if the marker file exists in the file system
val markerFileExists = fs.exists(path)
if (markerFileExists)
logger.info(s"MarkerFile $markerFile exists")
markerFileExists
}
/**
* Deletes a marker file from the file system.
*
* @param markerFile The path of the marker file to delete
*/
private def deleteMarkerFile(markerFile: String): Unit = {
logger.info(s"Deleting marker file: $markerFile")
// Create a new Path object with the provided marker file path
val path = new Path(markerFile)
// Get the FileSystem associated with the path
val fs = path.getFileSystem(new Configuration())
// Check if the marker file exists
if (fs.exists(path)) {
// Delete the marker file
fs.delete(path, true)
// Log a message indicating the successful deletion of the marker file
logger.info(s"MarkerFile $markerFile successfully deleted")
}
}
}
Spark Submit Command:
Before running the below script you need to update the BOOTSTRAP_SERVERS value.
#!/bin/bash
echo "Running <$0> script"
BOOTSTRAP_SERVERS="localhost:9092"
GROUP_ID="test-group"
TOPICS="test-topic"
MARKER_FILE="/apps/spark/streaming/shutdown/spark_streaming_kafka_marker_file"
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 2 \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=false \
--name SparkStreamingKafkaGracefulShutdownMarkerApp \
--class SparkStreamingKafkaGracefulShutdownMarkerApp \
/tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar $BOOTSTRAP_SERVERS $GROUP_ID $TOPICS $MARKER_FILE
echo "Finished <$0> script"
How to pass Shutdown Signal:
hdfs dfs -mkdir -p /apps/spark/streaming/shutdown
hdfs dfs -touch /apps/spark/streaming/shutdown/spark_streaming_kafka_marker_file
This approach internally exposes a socket thread or http service to receive requests, and waits for the trigger to close the streaming program. A socket thread or http service needs to be started in the driver.
It is recommended to use the http service as the socket is slightly low-level for processing and it is a little more complicated. If we use http service, we can directly use the embedded jetty to expose an http interface to the outside. One more advantage of using the http service is availability of embedded Jetty libraries within Spark, so we don’t required to introduce additional dependencies.
In the driver program, start an embedded Jetty server or any other HTTP server of your choice. The server should listen on a specific port to accept incoming requests. When the shutdown request is received, the HTTP service should send a signal or command to the Spark Streaming application to initiate the shutdown process.
To initiate the shutdown, you can use various methods such as the curl commands, web browsers, or other HTTP client tools. Send an HTTP request to the driver's IP address (which can be seen in the application log of the program startup, or on the Spark master UI page) and the configured port for the shutdown endpoint. This request will trigger the shutdown process and gracefully stop the Spark Streaming application.
Benefits:
Here are some of the benefits of using an HTTP service to gracefully shutdown a Spark Streaming application:
Drawbacks:
Here are some of the drawbacks of using an HTTP service to gracefully shutdown a Spark Streaming application:
Implementation:
Here are some steps on how to implement this approach:
SparkStreamingKafkaGracefulShutdownHttpApp.scala
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkStreamingKafkaGracefulShutdownHttpApp extends App with Serializable {
// Create a logger instance for logging messages
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
// Define AppName
private val appName = getClass.getSimpleName.replace("$", "")
if (args.length < 4) {
logger.error(s"Usage\t: $appName <bootstrapServers> <groupId> <topics> <jettyPort>")
logger.info(s"Example\t: $appName <localhost:9092> <my_group> <test_topic> <3443>")
System.exit(1)
}
// Consume command line arguments
private val Array(bootstrapServers, groupId, topic, jettyPort) = args
// Create a SparkConf object
val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
// Create StreamingContext, with batch duration in seconds
private val duration = if (args.length > 4) args(4).toInt else 30
private val batchDuration = Seconds(duration)
logger.info(s"Creating StreamingContext with batch duration $duration seconds...")
val streamingContext = new StreamingContext(sparkConf, batchDuration)
logger.info("StreamingContext created successfully ...")
// Define Kafka Parameters
private val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// Create a set of topics from a string
private val topics: Set[String] = topic.split(",").map(_.trim).toSet
// Create a streaming context from Kafka
private val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Get the lines of text from Kafka
val lines = stream.map(record => record.value())
// Split lines into words
val words = lines.flatMap(_.split(" "))
// Map every word to a tuple
private val wordMap = words.map(word => (word, 1))
// Count occurrences of each word
val wordCounts = wordMap.reduceByKey(_ + _)
// Print the word count
wordCounts.print()
// Start stream processing
streamingContext.start()
logger.info("StreamingContext Started ...")
// Start the HTTP server that accepts the stop request
StopByHttpHandler.httpServer(jettyPort.toInt, streamingContext, appName)
// Wait for the computation to terminate
streamingContext.awaitTermination()
}
StopByHttpHandler.scala
import org.apache.log4j.Logger
import org.apache.spark.streaming._
import org.spark_project.jetty.server.handler.{AbstractHandler, ContextHandler}
import org.spark_project.jetty.server.{Request, Server}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
object StopByHttpHandler {
// Create a logger instance for logging messages
@transient private lazy val logger: Logger = Logger.getLogger(getClass.getSimpleName.replace("$", ""))
/**
* Responsible for starting the guardian jetty service
*
* @param port - The port number exposed externally
* @param ssc - Streaming Context
* @param appName - The name of the Spark application
*/
def httpServer(port: Int, ssc: StreamingContext, appName: String): Unit = {
// Log a debug message indicating the start of the HTTP server
logger.debug("Starting the HTTP Server with the port " + port)
// Create a new Jetty server instance
val server = new Server(port)
// Create a new context handler
val context = new ContextHandler()
// Set the context path for the HTTP server
val contextPath = "/shutdown/" + appName
logger.info(s"Server Context Path $contextPath")
context.setContextPath(contextPath)
// Set the handler for the context to a custom handler for stopping the StreamingContext
context.setHandler(new StopStreamingContextHandler(ssc))
// Set the handler for the server
server.setHandler(context)
// Start the Jetty server
server.start()
// Log a message indicating the successful start of the HTTP server
logger.info("HttpServer started successfully")
}
/**
* Responsible for accepting http requests to gracefully shutdown the streaming application
*
* @param ssc - Streaming Context
*/
private class StopStreamingContextHandler(ssc: StreamingContext) extends AbstractHandler {
override def handle(target: String, baseRequest: Request, request: HttpServletRequest,
response: HttpServletResponse): Unit = {
// Log the target of the request
logger.info("Serving target: " + target)
// Log a message indicating the graceful stopping of the Spark Streaming application
logger.info("Gracefully stopping Spark Streaming Application")
// Stop the StreamingContext, including the underlying SparkContext, gracefully
ssc.stop(stopSparkContext = true, stopGracefully = true)
// Log a message indicating the successful stopping of the application
logger.info("Spark Streaming Application successfully stopped")
// Set the content type and status of the HTTP response
response.setContentType("text/html; charset=utf-8")
response.setStatus(HttpServletResponse.SC_OK)
// Write a response message to the HTTP response
response.getWriter.println("The Spark Streaming application has been successfully stopped.")
// Mark the base request as handled
baseRequest.setHandled(true)
}
}
}
Spark Submit Command:
Before running the below script you need to update the BOOTSTRAP_SERVERS value.
#!/bin/bash
echo "Running <$0> script"
BOOTSTRAP_SERVERS="localhost:9092"
GROUP_ID="test-group"
TOPICS="test-topic"
HTTP_PORT=3443
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 2 \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=false \
--name SparkStreamingKafkaGracefulShutdownHttpApp \
--class SparkStreamingKafkaGracefulShutdownHttpApp \
/tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar $BOOTSTRAP_SERVERS $GROUP_ID $TOPICS $HTTP_PORT
echo "Finished <$0> script"
How to pass the shutdown signal:
Step 1: Navigate to the Spark UI -> Executors -> Find out the driver. For example, the driver host is localhost.
Step 2: Run the following command in your browser to stop the application.
http://localhost:3443/shutdown/SparkStreamingKafkaGracefulShutdownHttpApp
In your case, you need to replace the Spark application driver host.
We will get the following message after a successful shutdown:
The Spark Streaming application has been successfully stopped.
In this article, we discussed the importance of graceful shutdown in Spark Streaming applications. We saw that graceful shutdown is required to ensure that all data is processed and that no data is lost. We also saw different ways to shutdown a streaming application gracefully. Finally, we discussed the recommended ways to use graceful shutdown in production.
Created on 04-16-2025 05:20 AM
Hi @RangaReddy ,
is something similar applicable also for structured streaming? Thanks.