Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Master Collaborator

Spark Streaming Graceful shutdown

1. Introduction

 In the first part of the blog post, we discussed the importance of graceful shutdown in Spark Streaming applications.

We also saw different ways to shutdown a Spark Streaming application gracefully, including:

  • Explicitly calling the JVM Shutdown Hook in the driver program.
  • Using spark.streaming.stopGracefullyOnShutdown = true setting.

In the second part, we will discuss a better way to gracefully shutdown the streaming application using other approaches.

 

2. Use an external approach to control internal program shutdown

This approach allows an external process to control the shutdown of the internal program by sending a marker record, which is detected by the internal program. The internal program can then initiate a graceful shutdown procedure, ensuring that any unsaved data is saved and any resources are properly released before exiting.

Benefits:

  • It is more reliable than relying on the internal application to shut down gracefully on its own.
  • It allows the external application to have more control over the shutdown process.
  • It can be used to shut down the internal application even if it is not responding to requests.

Drawbacks:

  • It requires the external application to be aware of the internal application's shutdown process.
  • It can add complexity to the shutdown process.

2.1 Different Implementations:

There are different ways to implement Spark Streaming shutdown gracefully using this approach.

  • Using a File System: In this approach, the marker record is a file that is written to a file system (e.g., HDFS, NFS, AWS S3, Google Cloud Storage etc.). The marker file is created in a specified directory, and its presence indicates that the streaming application should be stopped. The Spark Streaming application periodically checks for the existence of the file and if it is found, the application stops processing new data and shuts down gracefully. This approach is straightforward to implement and can work well in most scenarios.
  • Using a Database: In this approach, a marker record is stored in a relational database (e.g., MySQL, PostgreSQL, Oracle), indicating that the application should stop processing new data. A marker record is a row in a database table. The application checks the value of the marker record at regular intervals and stops when it indicates that the application should be stopped. This approach provides a centralized way of managing the marker record without introducing a new marker record into the input stream.
  • Using a Messaging System: In this approach, a marker message is sent to a message queue (e.g., Kafka, RabbitMQ, Pulsar) when the application should be stopped. The Spark Streaming application subscribes to the message topic and listens for the marker message. When the marker message is received, the application stops processing new data and shuts down gracefully. This approach provides a more flexible and scalable way of managing the marker record, as multiple instances of the application can listen to the same message queue.
  • Using a Distributed Cache: In this approach, the marker record is stored in a distributed cache (e.g., Redis) and periodically checked by the streaming application. If the marker record is present in the cache, the application stops processing new data and shuts down gracefully after processing the current batch.

Each of these approaches has its advantages and disadvantages, and the best approach depends on the specific requirements of the use case.

 

2.2 Using a File System:

We can use any file system to create a marker file. In this example, I used the HDFS file system to create the marker file.

Here are the steps to achieve graceful shutdown using the Marker Approach with HDFS:

  1. Define a marker file: The marker file is a simple text file that is created when the application is stopped. It should have a unique name that is unlikely to be used in the normal data stream.
  2. Periodically check for the presence of the marker file: The streaming application can use the Hadoop FileSystem API to periodically check for the presence of the marker file in the HDFS directory. This can be done every few seconds or minutes, depending on the application's requirements.
  3. If the marker file is detected, call ssc.stop() to shut down the application gracefully: Once the marker file is detected, the streaming application should call ssc.stop() to shut down the application gracefully.

Implementation:

SparkStreamingKafkaGracefulShutdownMarkerApp.scala

 

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkStreamingKafkaGracefulShutdownMarkerApp extends App with Serializable {

  // Create a logger instance for logging messages
  @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

  // Define AppName
  private val appName = getClass.getSimpleName.replace("$", "")

  if (args.length < 4) {
    logger.error(s"Usage\t: $appName <bootstrapServers> <groupId> <topics> <markerFile>")
    logger.info(s"Example\t: $appName <localhost:9092> <my_group> <test_topic> </tmp/graceful_shutdown>")
    System.exit(1)
  }

  // Consume command line arguments
  private val Array(bootstrapServers, groupId, topic, markerFile) = args

  // Create a SparkConf object
  val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")

  // Create StreamingContext, with batch duration in seconds
  private val duration = if (args.length > 4) args(4).toInt else 30
  logger.info(s"Creating StreamingContext with batch duration $duration seconds...")
  private val batchDuration = Seconds(duration)
  val streamingContext = new StreamingContext(sparkConf, batchDuration)
  logger.info("StreamingContext created successfully ...")

  // Define Kafka Parameters
  private val kafkaParams = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
    ConsumerConfig.GROUP_ID_CONFIG -> groupId,
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
  )

  // Create a set of topics from a string
  private val topics: Set[String] = topic.split(",").map(_.trim).toSet

  // Create a streaming context from Kafka
  private val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  )

  // Get the lines of text from Kafka
  val lines = stream.map(record => record.value())

  // Split lines into words
  val words = lines.flatMap(_.split(" "))

  // Map every word to a tuple
  private val wordMap = words.map(word => (word, 1))

  // Count occurrences of each word
  val wordCounts = wordMap.reduceByKey(_ + _)

  // Print the word count
  wordCounts.print()

  // Start stream processing
  streamingContext.start()
  logger.info("StreamingContext Started ...")

  // Gracefully shutdown the Spark Streaming application when the termination marker is received
  StopByMarkerFileSystem.stopByMarkerFile(streamingContext, markerFile)

  // Wait for the computation to terminate
  streamingContext.awaitTermination()
}

 

StopByMarkerFileSystem.scala

 

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.Logger
import org.apache.spark.streaming.StreamingContext

object StopByMarkerFileSystem {

  // Create a logger instance for logging messages
  @transient private lazy val logger: Logger = Logger.getLogger(getClass.getSimpleName.replace("$", ""))

  /**
   * Stops the Spark Streaming context gracefully based on the presence of a marker file.
   *
   * @param ssc         The StreamingContext to stop
   * @param markerFile  The path of the marker file indicating when to stop
   */
  def stopByMarkerFile(ssc: StreamingContext, markerFile: String): Unit = {
    var isStop = false
    while (!isStop) {
      if (!isStop && checkMarkerFileExists(markerFile)) {
        logger.info("Marker file exists, stopping the Spark Streaming Context gracefully ...")
        // Stop the streaming context if it has not already terminated
        ssc.stop(stopSparkContext = true, stopGracefully = true)
        // Delete the marker file
        deleteMarkerFile(markerFile)
        logger.info("Spark Streaming Context is Stopped ...")
        isStop = true
      }
    }
  }

  /**
   * Checks if a marker file exists in the file system.
   *
   * @param markerFile The path of the marker file
   * @return True if the marker file exists, false otherwise
   */

  private def checkMarkerFileExists(markerFile: String): Boolean = {
    // Create a new Path object with the provided marker file path
    val path = new Path(markerFile)

    // Get the FileSystem associated with the path
    val fs = path.getFileSystem(new Configuration())

    // Check if the marker file exists in the file system
    val markerFileExists = fs.exists(path)
    if (markerFileExists)
      logger.info(s"MarkerFile $markerFile exists")
    markerFileExists
  }

  /**
   * Deletes a marker file from the file system.
   *
   * @param markerFile The path of the marker file to delete
   */
  private def deleteMarkerFile(markerFile: String): Unit = {
    logger.info(s"Deleting marker file: $markerFile")

    // Create a new Path object with the provided marker file path
    val path = new Path(markerFile)

    // Get the FileSystem associated with the path
    val fs = path.getFileSystem(new Configuration())

    // Check if the marker file exists
    if (fs.exists(path)) {
      // Delete the marker file
      fs.delete(path, true)
      // Log a message indicating the successful deletion of the marker file
      logger.info(s"MarkerFile $markerFile successfully deleted")
    }
  }

}

 

Spark Submit Command:

 Before running the below script you need to update the BOOTSTRAP_SERVERS value.

 

#!/bin/bash
echo "Running <$0> script"

BOOTSTRAP_SERVERS="localhost:9092"
GROUP_ID="test-group"
TOPICS="test-topic"
MARKER_FILE="/apps/spark/streaming/shutdown/spark_streaming_kafka_marker_file"

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 2g \
  --num-executors 2 \
  --executor-cores 2 \
  --conf spark.dynamicAllocation.enabled=false \
  --name SparkStreamingKafkaGracefulShutdownMarkerApp \
  --class SparkStreamingKafkaGracefulShutdownMarkerApp \
  /tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar $BOOTSTRAP_SERVERS $GROUP_ID $TOPICS $MARKER_FILE

echo "Finished <$0> script"

 

How to pass Shutdown Signal:

hdfs dfs -mkdir -p /apps/spark/streaming/shutdown 
hdfs dfs -touch /apps/spark/streaming/shutdown/spark_streaming_kafka_marker_file

3. Expose the interface to the outside world and provide the shutdown function

This approach internally exposes a socket thread or http service to receive requests, and waits for the trigger to close the streaming program. A socket thread or http service needs to be started in the driver.

 

It is recommended to use the http service as the socket is slightly low-level for processing and it is a little more complicated. If we use http service, we can directly use the embedded jetty to expose an http interface to the outside. One more advantage of using the http service is availability of embedded Jetty libraries within Spark, so we don’t required to introduce additional dependencies.

 

In the driver program, start an embedded Jetty server or any other HTTP server of your choice. The server should listen on a specific port to accept incoming requests. When the shutdown request is received, the HTTP service should send a signal or command to the Spark Streaming application to initiate the shutdown process.

 

To initiate the shutdown, you can use various methods such as the curl commands, web browsers, or other HTTP client tools. Send an HTTP request to the driver's IP address (which can be seen in the application log of the program startup, or on the Spark master UI page) and the configured port for the shutdown endpoint. This request will trigger the shutdown process and gracefully stop the Spark Streaming application.

 

Benefits:

Here are some of the benefits of using an HTTP service to gracefully shutdown a Spark Streaming application:

  • It is easy to implement.
  • It does not relay on any external storage system.

Drawbacks:

Here are some of the drawbacks of using an HTTP service to gracefully shutdown a Spark Streaming application:

  • It requires an additional HTTP service to be running.
  • Ensure that the driver application is reachable from the outside world.
  • Make sure each application is using unique port number.

Implementation:

 

Here are some steps on how to implement this approach:

  • Create a simple HTTP server that listens for a shutdown request.
  • When the shutdown request is received, the HTTP server can then send a signal to the Spark Streaming application to shut down gracefully.
  • Find the IP address where the driver is located. This can be seen in the log of the program startup, or on the Spark master UI page.
  • Once you have the IP address, you can use curl or a web browser to send the shutdown request to the HTTP server.

SparkStreamingKafkaGracefulShutdownHttpApp.scala

 

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkStreamingKafkaGracefulShutdownHttpApp extends App with Serializable {

  // Create a logger instance for logging messages
  @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

  // Define AppName
  private val appName = getClass.getSimpleName.replace("$", "")

  if (args.length < 4) {
    logger.error(s"Usage\t: $appName <bootstrapServers> <groupId> <topics> <jettyPort>")
    logger.info(s"Example\t: $appName <localhost:9092> <my_group> <test_topic> <3443>")
    System.exit(1)
  }

  // Consume command line arguments
  private val Array(bootstrapServers, groupId, topic, jettyPort) = args

  // Create a SparkConf object
  val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")

  // Create StreamingContext, with batch duration in seconds
  private val duration = if (args.length > 4) args(4).toInt else 30
  private val batchDuration = Seconds(duration)
  logger.info(s"Creating StreamingContext with batch duration $duration seconds...")
  val streamingContext = new StreamingContext(sparkConf, batchDuration)
  logger.info("StreamingContext created successfully ...")

  // Define Kafka Parameters
  private val kafkaParams = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
    ConsumerConfig.GROUP_ID_CONFIG -> groupId,
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
  )

  // Create a set of topics from a string
  private val topics: Set[String] = topic.split(",").map(_.trim).toSet

  // Create a streaming context from Kafka
  private val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  )

  // Get the lines of text from Kafka
  val lines = stream.map(record => record.value())

  // Split lines into words
  val words = lines.flatMap(_.split(" "))

  // Map every word to a tuple
  private val wordMap = words.map(word => (word, 1))

  // Count occurrences of each word
  val wordCounts = wordMap.reduceByKey(_ + _)

  // Print the word count
  wordCounts.print()

  // Start stream processing
  streamingContext.start()
  logger.info("StreamingContext Started ...")

  // Start the HTTP server that accepts the stop request
  StopByHttpHandler.httpServer(jettyPort.toInt, streamingContext, appName)

  // Wait for the computation to terminate
  streamingContext.awaitTermination()
}

 

StopByHttpHandler.scala

 

import org.apache.log4j.Logger
import org.apache.spark.streaming._
import org.spark_project.jetty.server.handler.{AbstractHandler, ContextHandler}
import org.spark_project.jetty.server.{Request, Server}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

object StopByHttpHandler {

  // Create a logger instance for logging messages
  @transient private lazy val logger: Logger = Logger.getLogger(getClass.getSimpleName.replace("$", ""))

  /**
   * Responsible for starting the guardian jetty service
   *
   * @param port    - The port number exposed externally
   * @param ssc     - Streaming Context
   * @param appName - The name of the Spark application
   */
  def httpServer(port: Int, ssc: StreamingContext, appName: String): Unit = {
    // Log a debug message indicating the start of the HTTP server
    logger.debug("Starting the HTTP Server with the port " + port)

    // Create a new Jetty server instance
    val server = new Server(port)

    // Create a new context handler
    val context = new ContextHandler()

    // Set the context path for the HTTP server
    val contextPath = "/shutdown/" + appName
    logger.info(s"Server Context Path $contextPath")
    context.setContextPath(contextPath)

    // Set the handler for the context to a custom handler for stopping the StreamingContext
    context.setHandler(new StopStreamingContextHandler(ssc))

    // Set the handler for the server
    server.setHandler(context)

    // Start the Jetty server
    server.start()

    // Log a message indicating the successful start of the HTTP server
    logger.info("HttpServer started successfully")
  }

  /**
   * Responsible for accepting http requests to gracefully shutdown the streaming application
   *
   * @param ssc - Streaming Context
   */
  private class StopStreamingContextHandler(ssc: StreamingContext) extends AbstractHandler {
    override def handle(target: String, baseRequest: Request, request: HttpServletRequest,
                        response: HttpServletResponse): Unit = {
      // Log the target of the request
      logger.info("Serving target: " + target)

      // Log a message indicating the graceful stopping of the Spark Streaming application
      logger.info("Gracefully stopping Spark Streaming Application")

      // Stop the StreamingContext, including the underlying SparkContext, gracefully
      ssc.stop(stopSparkContext = true, stopGracefully = true)

      // Log a message indicating the successful stopping of the application
      logger.info("Spark Streaming Application successfully stopped")

      // Set the content type and status of the HTTP response
      response.setContentType("text/html; charset=utf-8")
      response.setStatus(HttpServletResponse.SC_OK)

      // Write a response message to the HTTP response
      response.getWriter.println("The Spark Streaming application has been successfully stopped.")

      // Mark the base request as handled
      baseRequest.setHandled(true)
    }
  }
}

 

Spark Submit Command:

 

Before running the below script you need to update the BOOTSTRAP_SERVERS value.

 

#!/bin/bash
echo "Running <$0> script"

BOOTSTRAP_SERVERS="localhost:9092"
GROUP_ID="test-group"
TOPICS="test-topic"
HTTP_PORT=3443

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 2g \
  --num-executors 2 \
  --executor-cores 2 \
  --conf spark.dynamicAllocation.enabled=false \
  --name SparkStreamingKafkaGracefulShutdownHttpApp \
  --class SparkStreamingKafkaGracefulShutdownHttpApp \
  /tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar $BOOTSTRAP_SERVERS $GROUP_ID $TOPICS $HTTP_PORT

echo "Finished <$0> script"

 

How to pass the shutdown signal:

Step 1: Navigate to the Spark UI -> Executors -> Find out the driver. For example, the driver host is localhost.

Step 2: Run the following command in your browser to stop the application.

http://localhost:3443/shutdown/SparkStreamingKafkaGracefulShutdownHttpApp

In your case, you need to replace the Spark application driver host.

We will get the following message after a successful shutdown:

 

The Spark Streaming application has been successfully stopped.

4. Conclusion:

In this article, we discussed the importance of graceful shutdown in Spark Streaming applications. We saw that graceful shutdown is required to ensure that all data is processed and that no data is lost. We also saw different ways to shutdown a streaming application gracefully. Finally, we discussed the recommended ways to use graceful shutdown in production.

5. References

 

  1. Spark Streaming Graceful Shutdown - Part1
  2. Source Code 

 

2,020 Views
0 Kudos
Version history
Last update:
‎07-19-2023 02:27 AM
Updated by: