Member since
01-12-2019
23
Posts
15
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2180 | 06-29-2017 01:02 AM | |
987 | 03-09-2017 05:07 PM | |
1422 | 02-02-2017 03:19 PM | |
1892 | 02-02-2017 03:14 PM | |
1498 | 07-07-2016 04:13 PM |
06-25-2018
03:11 AM
1 Kudo
The topic of the document
This document describes how the states are stored in memory per each operator, to determine how much memory would be needed to run the application and plan appropriate heap memory for executors.
Memory usage of UnsafeRow object
UnsafeRow is an unsafe implementation of Row, which is backed by raw memory instead of Java objects. The raw object of key and value of state is UnsafeRow, so we may want to understand how UnsafeRow object consumes memory in order to forecast how much the overall states will consume the memory. Structure of the raw memory
Raw memory is composed to such format: [null bit set] [values] [variable length portion]
null bit set
used for null tracking
aligned to 8 byte word boundaries
stores one bit per field
values
store one 8 byte word per field
if the type of field is a fixed-length primitive type (long, double, int, etc)
the value is stored directly
if the type of field is non-primitive or variable-length values
the value is reference information for actual values in raw memory
higher 32 bits: relative offset (starting from base offset)
lower 32 bits: size of actual values
variable length portion
raw format of values on non-primitive or variable-length fields are placed here
For example, suppose the fields in schema are (int, double, string backed by 1000 bytes of byte array), then the length of underlying raw memory would be (8 + 8 + 8 + 8 + 1000) = 1032.
I have tested various kinds of schema of UnsafeRow with Spark 2.4.0-SNAPSHOT (based on https://github.com/apache/spark/commit/fa2ae9d2019f839647d17932d8fea769e7622777😞
https://gist.github.com/HeartSaVioR/556ea7db6740fa2fce7dae72a75d9618
Please note the difference between the size of row object and size of copied row object. While UnsafeRow doesn't allow updating variable-length values so there's no issue regarding this, some internal row implementations expands underlying raw memory to multiply of current size when there's no enough space on that. Key/value format of state for operators NOTE: The format is based on Spark 2.3.0, and related to implementation details so it may subject to change. StateStoreSaveExec
Operations: agg(), count(), mean(), max(), avg(), min(), sum()
Format of key-value
key: UnsafeRow containing group-by fields
value: UnsafeRow containing key fields and another fields for aggregation results
Note: avg() (and aliases of avg) will take 2 fields - sum and count - to store to value UnsafeRow FlatMapGroupsWithStateExec
Operations: mapGroupsWithState() / flatMapGroupsWithState()
Format of key-value
key: UnsafeRow containing group-by fields
value: UnsafeRow containing fields for intermediate data schema defined as end users
no longer containing redundant key part in value StreamingDeduplicateExec
Operations: dropDuplicates()
Format of key-value
key: UnsafeRow containing fields specified for deduplication (parameters)
value: EMPTY_ROW defined in StreamingDeduplicateExec
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
null reference: 16 bytes StreamingSymmetricHashJoinExec
Only applied to stream - stream join, cause stream - static join doesn't require state.
Spark maintains two individual states per each input side: keyWithIndexToValue, and keyWithNumValues, to stores multiple input rows which have same values for joining keys.
Operations: stream - stream join
Format of key-value for keyWithIndexToValue
key: UnsafeRow containing joining fields + index number (starting from 0)
value: input row itself
Format of key-value for keyWithNumValues
key: UnsafeRow containing joining fields
value: UnsafeRow containing count (only one field) The impact of storing multiple versions from HDFSBackedStateStoreProvider
HDFSBackedStateStoreProvider (the only implementation of state provider which Apache Spark provides) stores multiple version of states in in-memory hash map. By default, it stores more than 100 versions of states based on the condition how maintenance operation runs.
When map for version N is being loaded, it loads state map for version N - 1, and do shallow copy from map for version N - 1. This behavior brings a couple of things to note:
Actual key and value object (UnsafeRow) will be shared between maps for multiple batches, unless the key is updated to the new value.
We normally expect only value is going to be updated when the key already exists in map, so though the key and value is copied before putting to map, the copied value of key will be put to map only when the key doesn't exist on map.
Heavy updates on state between batches will prevent row objects to be shared, and incurs heavy memory usages due to storing multiple versions.
Map entities and references should be maintained per each version. If there're huge number of key-value pairs in state, they might be possible some kinds of overhead, maybe due to count, not due to size. They are expected to be still much smaller than actual key and value objects. UPDATE on Spark 2.4
SPARK-24717 addressed the concern of storing multiple versions in memory: the default value of state versions in memory will be 2 instead of 100. The default value of state versions in storage will still be 100. SPARK-24763 addressed the concern of storing redundant columns in value for StateStoreSaveExec: the key columns will be removed from the value part in state, which makes state size smaller.
... View more
Labels:
06-29-2017
01:02 AM
1 Kudo
That was a bug on Storm side, tracked and fixed in HDP 2.6.1.1 and HDF 3.0.0.0. Please upgrade your HDP cluster or use HDF Storm cluster and let me know the problem persists. Unfortunately this is a bug regarding "uploading" artifact which uploaded artifact blobs have bad ACL, so you actually need to delete artifact blobs manually after upgrading.
... View more
03-09-2017
05:07 PM
topology.debug and setting log level to DEBUG are different. First one is logging debug information from Storm itself, and second one is logging DEBUG or lower levels. If it doesn't be logged, there might not about log level or so. (for example, just not reached there...)
... View more
02-02-2017
03:19 PM
1 Kudo
Latencies for each component excludes queue wait time and transfer latency between worker. 'complete latency' means all the nodes in tuple tree are acked, so it reflects slowest path of the tree. Btw, behind the scene, 'complete latency' includes waiting time for spout to handle ack from acker, and if your spout spends long time in nextTuple it will heavily affect 'complete latency'. It is fixed from STORM-1742 and will be included to next release. (Storm 1.0.3 for Apache side, but not sure which HDP / HDF versions contain this fix. Please let me know your HDP/HDF version.) Hope this helps.
... View more
02-02-2017
03:14 PM
1 Kudo
This was filed to STORM-2335 and just fixed. For Apache side it will be available for Storm 1.1.0 (and maybe 1.0.3 which release vote is in progress), and I think this patch will be applied to later HDP / HDF version. Hope this helps.
... View more
11-17-2016
10:04 AM
2 Kudos
HDP 2.5 is actually based on Apache Storm 1.1.0-SNAPSHOT which means that some features are not yet available to official Apache Storm releases. "Topology Spout Lag" is one of them. You can just ignore that error since it doesn't affect topology, or change the version of 'storm-kafka' to 1.0.1.2.5.0.0-1245 (NOTE: Hortonworks version) to make it work. Hope this helps.
... View more
07-07-2016
04:13 PM
1 Kudo
Storm in HDP 2.4 is based on Apache Storm 0.10.0 but not same since it has Nimbus H/A feature which is available on Apache Storm 1.0.0. You can change your storm-core dependency to match your Storm version like 0.10.0-2.4.x.xxx according to your HDP version and see it helps. You may also want to add Hortonworks repository (http://repo.hortonworks.com/content/repositories/releases) to your maven pom or gradle config.
... View more
07-06-2016
01:12 PM
Storm provides rich set of built-in metrics which are provided to Storm UI, and also metrics consumer. However these built-in metrics are measured for each task, while we are occasionally configuring a task to have multiple calls for external storages, and/or heavy computation sequentially.
Let’s say we have a Bolt which crawls the page from the web, and store page to external storage. public class StoreCrawledPageBolt extends BaseRichBolt {
@Override
public void execute(Tuple input) {
String url = tuple.getString(0);
// the execute latency for crawlPage and storePage can be vary
String html = crawlPage(url);
storePage(url, html);
collector.ack(input);
}
// skip method implementations...
} The latency for crawling page from web server is vary, which takes several milliseconds to tens of seconds which might be better to treat as time-out. The latency for storing page data to external storage would be relatively faster, but it might be slower for specific reason like major compaction in HBase or so. Since two instructions are a part of execute() in StoreCrawledPageBolt, Storm basically provides the average latency of execute time for the bolt. The execute latency hides the latency for each instruction, which prevents us to analysis how much each part is contributing the latency. In this case, we can instrument user-defined metric to measure the latency of each part. Let’s see how to do this. At first we define two metrics for measure each part of latency. Storm provides several types of metric, and ReducedMetric with MeanReducer calculates mean value for logged values. private transient ReducedMetric latencyForCrawl;
private transient ReducedMetric latencyForStore;
Please note that we define it as transient. IMetric is not Serializable so we defined as transient to avoid any serialization issues. Next, we initialize two metrics and register the metric instances. @Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
// other intialization here.
latencyForCrawl = new ReducedMetric(new MeanReducer());
latencyForStore = new ReducedMetric(new MeanReducer());
context.registerMetric(“latency-crawl”, latencyForCrawl, 60);
context.registerMetric(“latency-store”, latencyForStore, 60);
}
The meaning of first and second parameters are straightforward, metric name and instance of IMetric. Third parameter of TopologyContext#registerMetric is the period (seconds) to publish and reset the metric. So latencyForCrawl and latencyForStore are logging the values for 60 seconds, and publish the mean values, and reset values to default - not logged at all. Last, let’s log the latency for each part of execute. @Override
public void execute(Tuple input) {
String url = input.getString(0);
// the execute latency for crawlPage and storePage is vary
long start = System.currentTimeMillis();
String html = crawlPage(url);
latencyForCrawl.update(System.currentTimeMillis() - start);
start = System.currentTimeMillis();
storePage(url, html);
latencyForStore.update(System.currentTimeMillis() - start);
collector.ack(input);
}
We’re done! Mean value of the latency for two parts will be published to topology metrics consumer, and you can attach time-series DB to publish metrics to be shown as time-series fashion. You can refer http://storm.apache.org/releases/1.0.1/Metrics.html for detailed explanation of metrics feature. I prepared some screenshots on Storm UI, and Grafana dashboard which confirms above explanation based on example topology. For example topology I put random latency between 0 and 3000 milliseconds on crawl, and put 0 and 200 milliseconds on store, so in this case crawl part should contribute more on overall latency. (Full source code is available for Appendix A.) Let’s see execute latency on Storm UI. It shows that average execute latency is about 1730 milliseconds which is very high. But before seeing the each part of latency we can’t determine what part is exactly the problem of high latency. And let’s see dashboard simply configured to only show latencies for that bolt. As graph on the left shows, average latency of crawl is about 1.5 seconds, while average latency of store is about 100 milliseconds. We can see latency issue is behind on crawl, and take proper action like splitting bolt into two and set higher parallelism to crawl bolt. By the way, the hard thing to apply this to production is that it requires you to operate another time-series DB, and dashboard service (like Grafana). Fortunately, upcoming HDP 2.5 supports Storm metrics integration with Ambari Metrics Service (time-series metrics service), and also have built-in Grafana. So you can just configure AMS Storm plugin to topology metrics consumer, and enjoying configure dashboard from Grafana. ps. Thanks for giving great idea @Sriharsha Chintalapani. This article is a walkthrough of his suggestion. Appendix A. Full code of sample topology package com.hortonworks.storm.test;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
public class CustomMetricTopology {
static class URLSpout extends BaseRichSpout {
private static final String[] urlList = {"http://hortonworks.com", "https://community.hortonworks.com"};
private Random random = new Random();
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// skip
}
collector.emit(new Values(urlList[random.nextInt(urlList.length)]), UUID.randomUUID());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url"));
}
}
static class StoreCrawledPageBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(StoreCrawledPageBolt.class);
private static final int TIME_BUCKET_SIZE_IN_SECS = 60;
private static final int LATENCY_BOUND_OF_CRAWL = 3000;
private static final int LATENCY_BOUND_OF_STORE = 200;
private transient ReducedMetric latencyForCrawl;
private transient ReducedMetric latencyForStore;
private Random random;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
random = new Random();
latencyForCrawl = new ReducedMetric(new MeanReducer());
latencyForStore = new ReducedMetric(new MeanReducer());
context.registerMetric("latency-crawl", latencyForCrawl, TIME_BUCKET_SIZE_IN_SECS);
context.registerMetric("latency-store", latencyForStore, TIME_BUCKET_SIZE_IN_SECS);
}
@Override
public void execute(Tuple input) {
String url = input.getString(0);
LOG.info("crawling and storing url: {}", url);
try {
long start = System.currentTimeMillis();
String html = crawlPage(url);
latencyForCrawl.update(System.currentTimeMillis() - start);
start = System.currentTimeMillis();
storePage(url, html);
latencyForStore.update(System.currentTimeMillis() - start);
} catch (InterruptedException e) {
// skip
LOG.info("Interrupted, skipping...");
}
collector.ack(input);
}
private String crawlPage(String url) throws InterruptedException {
Thread.sleep(random.nextInt(LATENCY_BOUND_OF_CRAWL));
return "hello world";
}
private void storePage(String url, String html) throws InterruptedException {
Thread.sleep(random.nextInt(LATENCY_BOUND_OF_STORE));
LOG.info("Storing page for {} complete.", url);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// not needed as we don't emit any tuples to downstream
}
}
public static void main(String[] args)
throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
URLSpout urlSpout = new URLSpout();
StoreCrawledPageBolt storeCrawledPageBolt = new StoreCrawledPageBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("urlSpout", urlSpout);
builder.setBolt("storeCrawledPageBolt", storeCrawledPageBolt, 5).shuffleGrouping("urlSpout");
StormTopology topology = builder.createTopology();
Map<String, Object> conf = new HashMap<>();
conf.put("topology.max.spout.pending", 10);
StormSubmitter.submitTopology("custom-metric-topology", conf, topology);
}
}
... View more
Labels:
01-27-2016
01:43 AM
3 Kudos
FYI, same or upper version of Storm 0.10.0-beta, you can set 'topology.testing.always.try.serialize' to 'true' when testing your topology in local to force (de)serialization, and set it off when distributing works to multiple hosts.
... View more