Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Contributor

Storm provides rich set of built-in metrics which are provided to Storm UI, and also metrics consumer. However these built-in metrics are measured for each task, while we are occasionally configuring a task to have multiple calls for external storages, and/or heavy computation sequentially.

Let’s say we have a Bolt which crawls the page from the web, and store page to external storage.

public class StoreCrawledPageBolt extends BaseRichBolt {

  @Override
  public void execute(Tuple input) {
    String url = tuple.getString(0);

    // the execute latency for crawlPage and storePage can be vary
    String html = crawlPage(url);
    storePage(url, html);
    collector.ack(input);
  }

  // skip method implementations...
}

The latency for crawling page from web server is vary, which takes several milliseconds to tens of seconds which might be better to treat as time-out. The latency for storing page data to external storage would be relatively faster, but it might be slower for specific reason like major compaction in HBase or so.

Since two instructions are a part of execute() in StoreCrawledPageBolt, Storm basically provides the average latency of execute time for the bolt. The execute latency hides the latency for each instruction, which prevents us to analysis how much each part is contributing the latency.

In this case, we can instrument user-defined metric to measure the latency of each part. Let’s see how to do this.

At first we define two metrics for measure each part of latency. Storm provides several types of metric, and ReducedMetric with MeanReducer calculates mean value for logged values.

private transient ReducedMetric latencyForCrawl;
private transient ReducedMetric latencyForStore;

Please note that we define it as transient. IMetric is not Serializable so we defined as transient to avoid any serialization issues.

Next, we initialize two metrics and register the metric instances.

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  // other intialization here.
  latencyForCrawl = new ReducedMetric(new MeanReducer());
  latencyForStore = new ReducedMetric(new MeanReducer());
  context.registerMetric(“latency-crawl”, latencyForCrawl, 60);
  context.registerMetric(“latency-store”, latencyForStore, 60);
}

The meaning of first and second parameters are straightforward, metric name and instance of IMetric. Third parameter of TopologyContext#registerMetric is the period (seconds) to publish and reset the metric. So latencyForCrawl and latencyForStore are logging the values for 60 seconds, and publish the mean values, and reset values to default - not logged at all.

Last, let’s log the latency for each part of execute.

@Override
public void execute(Tuple input) {
  String url = input.getString(0);

  // the execute latency for crawlPage and storePage is vary
  long start = System.currentTimeMillis();
  String html = crawlPage(url);
  latencyForCrawl.update(System.currentTimeMillis() - start);

  start = System.currentTimeMillis();
  storePage(url, html);
  latencyForStore.update(System.currentTimeMillis() - start);

  collector.ack(input);
}

We’re done! Mean value of the latency for two parts will be published to topology metrics consumer, and you can attach time-series DB to publish metrics to be shown as time-series fashion.

You can refer http://storm.apache.org/releases/1.0.1/Metrics.html for detailed explanation of metrics feature.

I prepared some screenshots on Storm UI, and Grafana dashboard which confirms above explanation based on example topology. For example topology I put random latency between 0 and 3000 milliseconds on crawl, and put 0 and 200 milliseconds on store, so in this case crawl part should contribute more on overall latency.

(Full source code is available for Appendix A.)

Let’s see execute latency on Storm UI.

5540-screen-shot-2016-07-05-at-40453-pm.png

It shows that average execute latency is about 1730 milliseconds which is very high. But before seeing the each part of latency we can’t determine what part is exactly the problem of high latency.

And let’s see dashboard simply configured to only show latencies for that bolt.

5541-screen-shot-2016-07-05-at-41632-pm.png

As graph on the left shows, average latency of crawl is about 1.5 seconds, while average latency of store is about 100 milliseconds. We can see latency issue is behind on crawl, and take proper action like splitting bolt into two and set higher parallelism to crawl bolt.

By the way, the hard thing to apply this to production is that it requires you to operate another time-series DB, and dashboard service (like Grafana). Fortunately, upcoming HDP 2.5 supports Storm metrics integration with Ambari Metrics Service (time-series metrics service), and also have built-in Grafana. So you can just configure AMS Storm plugin to topology metrics consumer, and enjoying configure dashboard from Grafana.

ps. Thanks for giving great idea @Sriharsha Chintalapani. This article is a walkthrough of his suggestion.

Appendix A.

Full code of sample topology

package com.hortonworks.storm.test;

import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

public class CustomMetricTopology {
  static class URLSpout extends BaseRichSpout {
    private static final String[] urlList = {"http://hortonworks.com", "https://community.hortonworks.com"};

    private Random random = new Random();
    private SpoutOutputCollector collector;

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.collector = collector;
    }

    public void nextTuple() {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        // skip
      }

      collector.emit(new Values(urlList[random.nextInt(urlList.length)]), UUID.randomUUID());
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("url"));
    }
  }

  static class StoreCrawledPageBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(StoreCrawledPageBolt.class);
    private static final int TIME_BUCKET_SIZE_IN_SECS = 60;
    private static final int LATENCY_BOUND_OF_CRAWL = 3000;
    private static final int LATENCY_BOUND_OF_STORE = 200;

    private transient ReducedMetric latencyForCrawl;
    private transient ReducedMetric latencyForStore;
    private Random random;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;

      random = new Random();

      latencyForCrawl = new ReducedMetric(new MeanReducer());
      latencyForStore = new ReducedMetric(new MeanReducer());

      context.registerMetric("latency-crawl", latencyForCrawl, TIME_BUCKET_SIZE_IN_SECS);
      context.registerMetric("latency-store", latencyForStore, TIME_BUCKET_SIZE_IN_SECS);
    }

    @Override
    public void execute(Tuple input) {
      String url = input.getString(0);
      LOG.info("crawling and storing url: {}", url);

      try {
        long start = System.currentTimeMillis();
        String html = crawlPage(url);
        latencyForCrawl.update(System.currentTimeMillis() - start);

        start = System.currentTimeMillis();
        storePage(url, html);
        latencyForStore.update(System.currentTimeMillis() - start);
      } catch (InterruptedException e) {
        // skip
        LOG.info("Interrupted, skipping...");
      }

      collector.ack(input);
    }

    private String crawlPage(String url) throws InterruptedException {
      Thread.sleep(random.nextInt(LATENCY_BOUND_OF_CRAWL));
      return "hello world";
    }

    private void storePage(String url, String html) throws InterruptedException {
      Thread.sleep(random.nextInt(LATENCY_BOUND_OF_STORE));
      LOG.info("Storing page for {} complete.", url);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      // not needed as we don't emit any tuples to downstream
    }

  }

  public static void main(String[] args)
      throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
    URLSpout urlSpout = new URLSpout();
    StoreCrawledPageBolt storeCrawledPageBolt = new StoreCrawledPageBolt();

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("urlSpout", urlSpout);
    builder.setBolt("storeCrawledPageBolt", storeCrawledPageBolt, 5).shuffleGrouping("urlSpout");

    StormTopology topology = builder.createTopology();
    Map<String, Object> conf = new HashMap<>();
    conf.put("topology.max.spout.pending", 10);
    StormSubmitter.submitTopology("custom-metric-topology", conf, topology);
  }
}
2,817 Views
0 Kudos