<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Monitoring Spark Jobs in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111183#M25881</link>
    <description>&lt;P&gt;
	I've used the Prometheus push gateway. The hard part of monitoring a Spark job is that you never know on which server it is going to run. Therefor you have the push gateway. From your job you can push metrics to the gateway instead of the default pull / scrape from prometheus.&lt;/P&gt;&lt;P&gt;
	Here you can find some sample code:&lt;/P&gt;&lt;P&gt;Add a StreamingListener to the context:&lt;/P&gt;&lt;PRE&gt;streamingContext.addStreamingListener(new PrometheusSparkMetrics(streamingContext.sparkContext.appName))&lt;/PRE&gt;&lt;P&gt;The PrometheusSparkMetrics:&lt;/P&gt;&lt;PRE&gt;package com.godatadriven.twitter_classifier

import io.prometheus.client.exporter.PushGateway
import io.prometheus.client.{CollectorRegistry, Gauge}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}

class PrometheusSparkMetrics(sparkJob: String) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val registry: CollectorRegistry = new CollectorRegistry()
    val pushGateway: PushGateway = new PushGateway("127.0.0.1:9091")
    addInputRate(batchCompleted, registry)
    addSchedulingDelay(batchCompleted, registry)
    addProcessingTime(batchCompleted, registry)
    addTotalDelay(batchCompleted, registry)
    pushGateway.push(registry, "spark_streaming_exporter")
  }

  def addInputRate(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry): Unit = {
    addMetric(registry, batchCompleted.batchInfo.numRecords, "spark_streaming_input_rate", "The input rate of our spark streaming job")
  }

  def addSchedulingDelay(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.schedulingDelay.get, "spark_streaming_scheduling_delay", "The scheduling delay of our spark streaming job")
  }
  def addProcessingTime(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.processingDelay.get, "spark_streaming_processing_time", "The processing delay of our spark streaming job")
  }

  def addTotalDelay(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.totalDelay.get, "spark_streaming_total_delay", "The total delay of our spark streaming job")
  }

  def addMetric(registry: CollectorRegistry, value: Double, name: String, helpText: String): Unit = {
    val totalDelay: Gauge = Gauge.build()
      .help(helpText)
      .name(name)
      .labelNames("spark_job")
      .register(registry)
    totalDelay.labels(sparkJob).set(value)
  }
}
&lt;/PRE&gt;</description>
    <pubDate>Fri, 22 Apr 2016 20:46:13 GMT</pubDate>
    <dc:creator>weverwijk</dc:creator>
    <dc:date>2016-04-22T20:46:13Z</dc:date>
    <item>
      <title>Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111181#M25879</link>
      <description>&lt;P&gt;What's the best way to monitor Spark jobs? SHS provides some information, but not in so user friendly manner. Has anybody tried &lt;A href="http://rokroskar.github.io/monitoring-spark-on-hadoop-with-prometheus-and-grafana.html"&gt;Prometheus &lt;/A&gt;and Grafana. Spark is running on Yarn, and 80% of cluster jobs/apps are based on Spark.&lt;/P&gt;</description>
      <pubDate>Fri, 22 Apr 2016 11:47:09 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111181#M25879</guid>
      <dc:creator>pminovic</dc:creator>
      <dc:date>2016-04-22T11:47:09Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111182#M25880</link>
      <description>&lt;P&gt;Spark provides several ways to expose metrics (http://spark.apache.org/docs/latest/monitoring.html). Spark provides a GraphiteSink for sending metrics to Graphite. Unfortunately there's no native way to send metrics to Prometheus.&lt;/P&gt;&lt;P&gt;To use Prometheus you scrape from Graphite like suggested it the blog. Another way is to expose Spark metrics via the JmxSink, and set up a Prometheus JMX exporter to transform the JMX MBeans into metrics readable by Prometheus. A nice blog about it is posted here: &lt;A href="https://argus-sec.com/blog/monitoring-spark-prometheus" target="_blank"&gt;https://argus-sec.com/blog/monitoring-spark-prometheus&lt;/A&gt;.&lt;/P&gt;</description>
      <pubDate>Fri, 22 Apr 2016 13:23:07 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111182#M25880</guid>
      <dc:creator>basharenslak</dc:creator>
      <dc:date>2016-04-22T13:23:07Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111183#M25881</link>
      <description>&lt;P&gt;
	I've used the Prometheus push gateway. The hard part of monitoring a Spark job is that you never know on which server it is going to run. Therefor you have the push gateway. From your job you can push metrics to the gateway instead of the default pull / scrape from prometheus.&lt;/P&gt;&lt;P&gt;
	Here you can find some sample code:&lt;/P&gt;&lt;P&gt;Add a StreamingListener to the context:&lt;/P&gt;&lt;PRE&gt;streamingContext.addStreamingListener(new PrometheusSparkMetrics(streamingContext.sparkContext.appName))&lt;/PRE&gt;&lt;P&gt;The PrometheusSparkMetrics:&lt;/P&gt;&lt;PRE&gt;package com.godatadriven.twitter_classifier

import io.prometheus.client.exporter.PushGateway
import io.prometheus.client.{CollectorRegistry, Gauge}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}

class PrometheusSparkMetrics(sparkJob: String) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val registry: CollectorRegistry = new CollectorRegistry()
    val pushGateway: PushGateway = new PushGateway("127.0.0.1:9091")
    addInputRate(batchCompleted, registry)
    addSchedulingDelay(batchCompleted, registry)
    addProcessingTime(batchCompleted, registry)
    addTotalDelay(batchCompleted, registry)
    pushGateway.push(registry, "spark_streaming_exporter")
  }

  def addInputRate(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry): Unit = {
    addMetric(registry, batchCompleted.batchInfo.numRecords, "spark_streaming_input_rate", "The input rate of our spark streaming job")
  }

  def addSchedulingDelay(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.schedulingDelay.get, "spark_streaming_scheduling_delay", "The scheduling delay of our spark streaming job")
  }
  def addProcessingTime(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.processingDelay.get, "spark_streaming_processing_time", "The processing delay of our spark streaming job")
  }

  def addTotalDelay(batchCompleted: StreamingListenerBatchCompleted, registry: CollectorRegistry) = {
    addMetric(registry, batchCompleted.batchInfo.totalDelay.get, "spark_streaming_total_delay", "The total delay of our spark streaming job")
  }

  def addMetric(registry: CollectorRegistry, value: Double, name: String, helpText: String): Unit = {
    val totalDelay: Gauge = Gauge.build()
      .help(helpText)
      .name(name)
      .labelNames("spark_job")
      .register(registry)
    totalDelay.labels(sparkJob).set(value)
  }
}
&lt;/PRE&gt;</description>
      <pubDate>Fri, 22 Apr 2016 20:46:13 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111183#M25881</guid>
      <dc:creator>weverwijk</dc:creator>
      <dc:date>2016-04-22T20:46:13Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111184#M25882</link>
      <description>&lt;P&gt;Interesting. Spark 1.6 added a way to start plugin classes in the driver on Yarn clusters; adding one to set up the Prometheus listener should be straightforward. Once implemented, all you'd need to do to start it would be add it to the classpath and list the classname in the right spark configuration option&lt;/P&gt;</description>
      <pubDate>Sat, 23 Apr 2016 19:24:11 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111184#M25882</guid>
      <dc:creator>stevel</dc:creator>
      <dc:date>2016-04-23T19:24:11Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111185#M25883</link>
      <description>&lt;P&gt;Thanks for the code! We'll definitely keep this in mind, but for our task at hand we don't have access to Spark source, so we'll go for GraphiteSink or JmxSink.&lt;/P&gt;</description>
      <pubDate>Sun, 24 Apr 2016 11:06:57 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111185#M25883</guid>
      <dc:creator>pminovic</dc:creator>
      <dc:date>2016-04-24T11:06:57Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111186#M25884</link>
      <description>&lt;P&gt;Thanks, we'll try with GraphiteSink first.&lt;/P&gt;</description>
      <pubDate>Sun, 24 Apr 2016 11:07:54 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111186#M25884</guid>
      <dc:creator>pminovic</dc:creator>
      <dc:date>2016-04-24T11:07:54Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring Spark Jobs</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111187#M25885</link>
      <description>&lt;P&gt;Typically I would add driver classpath entries using --driver-class-path, what is this new Spark 1.6 plugin functionality? Is it different from using --driver-class-path?&lt;/P&gt;</description>
      <pubDate>Tue, 26 Apr 2016 14:21:22 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Monitoring-Spark-Jobs/m-p/111187#M25885</guid>
      <dc:creator>basharenslak</dc:creator>
      <dc:date>2016-04-26T14:21:22Z</dc:date>
    </item>
  </channel>
</rss>

