Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Monitoring Spark Jobs

avatar
Master Guru

What's the best way to monitor Spark jobs? SHS provides some information, but not in so user friendly manner. Has anybody tried Prometheus and Grafana. Spark is running on Yarn, and 80% of cluster jobs/apps are based on Spark.

1 ACCEPTED SOLUTION

avatar
New Contributor

Spark provides several ways to expose metrics (http://spark.apache.org/docs/latest/monitoring.html). Spark provides a GraphiteSink for sending metrics to Graphite. Unfortunately there's no native way to send metrics to Prometheus.

To use Prometheus you scrape from Graphite like suggested it the blog. Another way is to expose Spark metrics via the JmxSink, and set up a Prometheus JMX exporter to transform the JMX MBeans into metrics readable by Prometheus. A nice blog about it is posted here: https://argus-sec.com/blog/monitoring-spark-prometheus.

View solution in original post

6 REPLIES 6

avatar
New Contributor

Spark provides several ways to expose metrics (http://spark.apache.org/docs/latest/monitoring.html). Spark provides a GraphiteSink for sending metrics to Graphite. Unfortunately there's no native way to send metrics to Prometheus.

To use Prometheus you scrape from Graphite like suggested it the blog. Another way is to expose Spark metrics via the JmxSink, and set up a Prometheus JMX exporter to transform the JMX MBeans into metrics readable by Prometheus. A nice blog about it is posted here: https://argus-sec.com/blog/monitoring-spark-prometheus.

avatar
Master Guru

Thanks, we'll try with GraphiteSink first.

avatar
New Contributor

I've used the Prometheus push gateway. The hard part of monitoring a Spark job is that you never know on which server it is going to run. Therefor you have the push gateway. From your job you can push metrics to the gateway instead of the default pull / scrape from prometheus.

Here you can find some sample code:

Add a StreamingListener to the context:

streamingContext.addStreamingListener(new PrometheusSparkMetrics(streamingContext.sparkContext.appName))

The PrometheusSparkMetrics:

package com.godatadriven.twitter_classifier

import io.prometheus.client.exporter.PushGateway
import io.prometheus.client.{CollectorRegistry, Gauge}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}

class PrometheusSparkMetrics(sparkJob: String) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val registry: CollectorRegistry = new CollectorRegistry()
    val pushGateway: PushGateway = new PushGateway("127.0.0.1:9091")
    addInputRate(batchCompleted, registry)
    addSchedulingDelay(batchCompleted, registry)
    addProcessingTime(batchCompleted, registry)
    addTotalDelay(batchCompleted, registry)
    pushGateway.push(registry, "spark_streaming_exporter")
  }

  def addInputRate(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry): Unit = {
    addMetric(registry, batchCompleted.batchInfo.numRecords, "spark_streaming_input_rate", "The input rate of our spark streaming job")
  }

  def addSchedulingDelay(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.schedulingDelay.get, "spark_streaming_scheduling_delay", "The scheduling delay of our spark streaming job")
  }
  def addProcessingTime(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.processingDelay.get, "spark_streaming_processing_time", "The processing delay of our spark streaming job")
  }

  def addTotalDelay(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.totalDelay.get, "spark_streaming_total_delay", "The total delay of our spark streaming job")
  }

  def addMetric(registry: CollectorRegistry, value: Double, name: String, helpText: String): Unit = {
    val totalDelay: Gauge = Gauge.build()
      .help(helpText)
      .name(name)
      .labelNames("spark_job")
      .register(registry)
    totalDelay.labels(sparkJob).set(value)
  }
}

avatar
Master Guru

Thanks for the code! We'll definitely keep this in mind, but for our task at hand we don't have access to Spark source, so we'll go for GraphiteSink or JmxSink.

avatar

Interesting. Spark 1.6 added a way to start plugin classes in the driver on Yarn clusters; adding one to set up the Prometheus listener should be straightforward. Once implemented, all you'd need to do to start it would be add it to the classpath and list the classname in the right spark configuration option

avatar
New Contributor

Typically I would add driver classpath entries using --driver-class-path, what is this new Spark 1.6 plugin functionality? Is it different from using --driver-class-path?