Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Cloudera Employee

Introduction

Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data. In this article we explore the support for windowing operations and stateful processing that were recently added to Apache Storm.

Windowing support

Windowing computations is one of the common use cases in stream processing where the unbounded stream of data is split into finite sets based on some criteria (e.g. time) and computation is applied on each group of events. An example would be to compute the top trending twitter topic in the last hour.

Windowing is primarily used for aggregations, joins, pattern matching and more. Windows can be seen as an in-memory table where events are added and evicted based on some policies.

So far Apache Storm relied on developers to built their own windowing logic. There were no recommended or high level abstractions that developers could use to define a Window in a standard way in a Topology.

Windowing support was recently added to Apache Storm. Windows can be specified with the following two parameters,

  1. Window length - the length or duration of the window
  2. Sliding interval - the interval at which the window slides

Storm has support for sliding and tumbling windows based on time duration and/or event count.

Sliding Window

In a sliding window tuples are grouped in windows and window slides every sliding interval. For example, a time duration based sliding window of length 10 secs and sliding interval of 5 seconds is evaluated every 5 seconds.

1769-image01.png

Here the first window (w1) contains the events that arrived up to the 10th second and the second window (w2) contains the events that arrived between 5th and 15th second. Also note that events e3 - e6 are part of both windows. When window w2 is evaluated at time t = 15 secs, the events e1 and e2 are expired (dropped out of the event queue).

Tumbling Window

In a tumbling window tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows. For example consider a time duration based tumbling window with length 5 secs.

1770-image02.png

Here the first window (w1) contains the events that arrived up to the 5th second, the second window (w2) contains the events that arrived between 5th and 10th second and the third window (w3) contains the events between 10th and 15th second and so on. The window is evaluated every five seconds and none of the windows overlap.

Window configuration

Storm supports specifying the window length and sliding intervals as a count of the number of tuples or as a time duration or a combination of both.

The bolt interface IWindowedBolt can be implemented by bolts that needs windowing support.

public interface IWindowedBolt extends IComponent {
   void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
   /**
     * Process tuples falling within the window and optionally emit 
     * new tuples based on the tuples in the input window.
     */
   void execute(TupleWindow inputWindow);
   void cleanup();
}

Every time the window slides (the sliding interval elapses), the execute method is invoked. The TupleWindow parameter gives access to the current tuples in the window, the tuples that expired and the new tuples that are added since last window was computed which can be used for efficient windowing computations.

Bolts that needs windowing support typically would extend BaseWindowedBolt which has the apis for specifying the window length and sliding intervals. For example,

public class SlidingWindowBolt extends BaseWindowedBolt {
   private OutputCollector collector;
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
       this.collector = collector;
   }
   @Override
   public void execute(TupleWindow inputWindow) {
     for(Tuple tuple: inputWindow.get()) {
       // do the windowing computation
       ...
     }
     collector.emit(new Values(computedValue));
   }
}

The BaseWindowedBolt has apis to define the window parameters based on time duration, count or a combination of both. The following window configurations are supported.

/* 
 * Tuple count based sliding window that slides after slidingInterval number of tuples 
 */
withWindow(Count windowLength, Count slidingInterval)
/* 
 * Tuple count based window that slides with every incoming tuple 
 */
withWindow(Count windowLength)
/* 
 * Tuple count based sliding window that slides after slidingInterval time duration 
 */
withWindow(Count windowLength, Duration slidingInterval)
/* 
 * Time duration based sliding window that slides after slidingInterval time duration 
 */
withWindow(Duration windowLength, Duration slidingInterval)
/* 
 * Time duration based window that slides with every incoming tuple 
 */
withWindow(Duration windowLength)
/* 
 * Time duration based sliding window that slides after slidingInterval number of tuples 
 */
withWindow(Duration windowLength, Count slidingInterval)
/* 
 * Count based tumbling window that tumbles after the specified count of tuples 
 */
withTumblingWindow(BaseWindowedBolt.Count count)
/* 
 * Time duration based tumbling window that tumbles after the specified time duration 
 */
withTumblingWindow(BaseWindowedBolt.Duration duration)

The windowed bolts can be added to the topology via the TopologyBuilder similar to the normal bolts. For example,

TopologyBuilder builder = new TopologyBuilder();
/*
 * A windowed bolt that computes sum over a sliding window with window length of
 * 30 events that slides after every 10 events.
 */
builder.setBolt("sum", new WindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
       .shuffleGrouping("spout");

Tuple timestamp and out of order tuples

By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp. This can be useful for processing the events based on the time when the event occurred for example log entries with timestamps which are processed later.

/**
* Specify the tuple field that represents the timestamp as a long value. If this field
* is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)

The value for the fieldName will be looked up from the incoming tuple and considered for windowing calculations. If this option is specified, all the tuples are expected to contain the timestamp field. If the field is not present in the tuple an exception will be thrown and the topology will terminate. To fix this, the offending tuple must be manually removed from the source (e.g. kafka) and topology must be restarted.

Along with the timestamp field name, a time lag parameter can also be specified which indicates the max time limit for tuples with out of order timestamps.

/**
* Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)

E.g. If the lag is 5 secs and a tuple t1 arrived with timestamp 06:00:05 no tuples may arrive with tuple timestamp earlier than 06:00:00. If a tuple arrives with timestamp 05:59:59 after t1 and the window has moved past t1, it will be treated as a late tuple and not processed. Currently the late tuples are ignored and just logged in the worker log files at INFO level.

Watermarks

For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept used by Flink and Google's MillWheel for tracking event based timestamps.

Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.

/**
* Specify the watermark event generation interval. Watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)

When a watermark is received, all windows up to that timestamp will be evaluated. For example, consider tuple timestamp based processing with following window parameters,

Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

Current timestamp = 09:00:00

Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) are received between 9:00:00 and 9:00:01.

At time t = 09:00:01, watermark w1 = 6:00:31 is emitted since no tuples earlier than 6:00:31 can arrive.

Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s).

  1. 5:59:50 - 06:00:10 with tuples e1, e2, e3
  2. 6:00:00 - 06:00:20 with tuples e1, e2, e3, e4
  3. 6:00:10 - 06:00:30 with tuples e4, e5

e6 is not evaluated since watermark timestamp 6:00:31 is lesser than the tuple ts 6:00:36.

Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) are received between 9:00:01 and 9:00:02

At time t = 09:00:02 another watermark w2 = 08:00:34 is emitted since no tuples earlier than 8:00:34 can arrive now. Three windows will be evaluated,

  1. 6:00:20 - 06:00:40 with tuples e5, e6 (from earlier batch)
  2. 6:00:30 - 06:00:50 with tuple e6 (from earlier batch)
  3. 8:00:10 - 08:00:30 with tuples e7, e8, e9

e10 is not evaluated since the tuple ts 8:00:39 is beyond the watermark time 8:00:34.

The window calculation considers the time gaps and computes the windows based on the tuple timestamp.

Guarantees

The windowing functionality in storm core currently provides at-least once guarantee. The values emitted from the bolt’s execute(TupleWindow inputWindow) method are automatically anchored to all the tuples in the inputWindow. The downstream bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree. If not the tuples will be replayed and the windowing computation will be re-evaluated.

The tuples in the window are automatically acknowledged when they fall out of the window after windowLength + slidingInterval. Note that the configuration topology.message.timeout.secs should be more than windowLength + slidingInterval for time based windows; otherwise the tuples will timeout and get replayed and can result in duplicate evaluations. For count based windows, the configuration should be adjusted such that windowLength + slidingInterval tuples can be received within the timeout period.

Example topology

An example topology in storm starter SlidingWindowTopology shows how to use the apis to compute a sliding window sum and a tumbling window average.

Future improvements

Currently the windowing computations are based on count or time intervals. i.e the windows are evaluated (triggered) after a specified time interval or after a specified number of tuples arrive. Though this addresses most of the common use cases, there are a few cases where one might want to trigger the windowing computation based on some custom criteria. For example, for events tracking the mouse movement, the window could be triggered after a predefined period of inactivity (user session). Ability for users to define custom triggers and evictors can be added in future for this purpose.

Currently the windowing support is available only in core storm, this can be extended to provide windowing support in storm trident as well.

Saving the window state

One of the issues with windowing is that the tuples cannot be acked until they completely fall out of the window. For instance, consider a one hour window that slides every minute. The tuples in the window is evaluated (passed to the bolt execute) every minute, but the tuples that arrived at the first minute are acked only after one hour and one minute. So if the system crashes just after one hour, storm’s acking mechanism replays all the tuples (from 0 - 60th minute) and will cause every window to be re-evaluated. That is the bolt’s execute method will be invoked again with the same set of tuples (60 times). One way to avoid this would be to save the state of the window evaluation (i.e. track the tuples that are already evaluated) in some external durable state and use this info to prune the duplicate window evaluation during recovery.

Next we will explore the storm support for state management and how it can be used to avoid duplicate window evaluations.

State Management

Stateful abstractions allow storm bolts to store and retrieve the state of its computation. So far Storm support for doing stateful computations was limited to trident apis and users had to write custom logic in bolts to save the state of its computations in storm core. With the addition of state management to core storm, the framework automatically and periodically snapshots the state of the bolts across the topology in a consistent manner. There is a default in-memory based state implementation and also a Redis backed implementation that provides state persistence.

Bolts that requires its state to be managed and persisted by the framework should implement the IStatefulBolt interface or extend the BaseStatefulBolt and implement void initState(T state) method. The initState method is invoked by the framework during the bolt initialization with the previously saved state of the bolt. This is invoked after prepare but before the bolt starts processing any tuples.

Currently the only kind of State implementation that is supported is KeyValueState which provides key-value mapping.

For example a word count bolt could use the key value state abstraction for the word counts as follows.

  1. Extend the BaseStatefulBolt and type parameterize it with KeyValueState which would store the mapping of word to count.
  2. The bolt gets initialized with its previously saved state in the init method. This will contain the word counts last committed by the framework during the previous run.
  3. In the execute method, update the word count.
    public class WordCountBolt 
    extends BaseStatefulBolt> {
      private KeyValueState wordCounts;
      ...
      @Override
      public void initState(KeyValueState state) {
        wordCounts = state;
      }
      @Override
      public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = wordCounts.get(word, 0);
        count++;
        wordCounts.put(word, count);
        collector.emit(tuple, new Values(word, count));
        collector.ack(tuple);
      }
      ...
    }
     
  4. The framework periodically checkpoints the state of the bolt (default every second). The frequency can be changed by setting the storm config topology.state.checkpoint.interval.ms.
  5. For state persistence, use a state provider that supports persistence by setting the topology.state.provider in the storm config. E.g. for Redis based key-value state implementation topology.state.provider can be set to org.apache.storm.redis.state.RedisKeyValueStateProvider in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the storm-redis-*.jar in the extlib directory.
  6. The state provider properties can be overridden by setting topology.state.provider.config. For Redis state this is a json config with the following properties.
  7. {
    "keyClass": "Optional fully qualified class name of the Key type.",
    "valueClass": "Optional fully qualified class name of the Value type.",
    "keySerializerClass": "Optional Key serializer implementation class.",
    "valueSerializerClass": "Optional Value Serializer implementation class.",
    "jedisPoolConfig": {
      "host": "localhost",
      "port": 6379,
      "timeout": 2000,
      "database": 0,
      "password": "xyz"
      }
    }
     

Checkpoint mechanism

Checkpoint is triggered by an internal checkpoint spout at the specified topology.state.checkpoint.interval.ms. If there is at least one IStatefulBolt in the topology, the checkpoint spout is automatically added by the topology builder . For stateful topologies, the topology builder wraps the IStatefulBolt in a StatefulBoltExecutor which handles the state commits on receiving the checkpoint tuples. The non stateful bolts are wrapped in a CheckpointTupleForwarder which just forwards the checkpoint tuples so that the checkpoint tuples can flow through the topology DAG. The checkpoint tuples flow through a separate internal stream namely $checkpoint. The topology builder wires the checkpoint stream across the whole topology with the checkpoint spout at the root.

1781-image03.png

At checkpoint intervals the checkpoint tuples are emitted by the checkpoint spout. On receiving a checkpoint tuple, the state of the bolt is saved and then the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all its input streams before it saves its state so that the state represents a consistent state across the topology. Once the checkpoint spout receives ACK from all the bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout.

The checkpoint mechanism builds on top of storm's existing acking mechanism to replay the tuples and uses concepts from the asynchronous snapshot algorithm used by flink and the Chandy-Lamport algorithm for distributed snapshots.The checkpointing internally uses a three phase commit protocol with a prepare and commit phase so that the state across the topology is saved in a consistent and atomic manner.

Recovery

The recovery phase is triggered when the topology is started for the first time. If the previous transaction was not successfully prepared, a rollback message is sent across the topology so that if a bolt has some prepared transactions it can be discarded. If the previous transaction was prepared successfully but not committed, a commit message is sent across the topology so that the prepared transactions can be committed. After these steps are complete, the bolts are initialized with the state.

The recovery is also triggered if one of the bolts fails to acknowledge the checkpoint message or say a worker crashed in the middle. Thus when the worker is restarted by the supervisor, the checkpoint mechanism makes sure that the bolt gets initialized with its previous state and the checkpointing continues from the point where it left off.

Guarantee

Storm relies on the acking mechanism to replay tuples in case of failures. It is possible that the state is committed but the worker crashes before acking the tuples. In this case the tuples are replayed causing duplicate state updates. Also currently the StatefulBoltExecutor continues to process the tuples from a stream after it has received a checkpoint tuple on one stream while waiting for checkpoint to arrive on other input streams for saving the state. This can also cause duplicate state updates during recovery.

The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee.

In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the BaseBasicBolt. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the WordCountBolt example in the State management section above.

IStateful bolt hooks

IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions.

   /**
     * This is a hook for the component to perform some actions just before the
     * framework commits its state.
     */
   void preCommit(long txid);
   /**
     * This is a hook for the component to perform some actions just before the
     * framework prepares its state.
     */
   void prePrepare(long txid);
   /**
     * This is a hook for the component to perform some actions just before the
     * framework rolls back the prepared state.
     */
   void preRollback();

This is optional and stateful bolts are not expected to provide any implementation. This is provided so that other system level components can be built on top of the stateful abstractions where we might want to take some actions before the stateful bolt's state is prepared, committed or rolled back.

Providing custom state implementations

Currently the only kind of State implementation supported is KeyValueState which provides key-value mapping.

Custom state implementations should provide implementations for the methods defined in the State interface. These are the void prepareCommit(long txid), void commit(long txid) and rollback() methods. commit() method is optional and is useful if the bolt manages the state on its own. This is currently used only by the internal system bolts, for e.g. the CheckpointSpout to save its state. KeyValueState implementations should also implement the methods defined in the KeyValueState interface.

State provider

The framework instantiates the state via the corresponding StateProvider implementation. A custom state should also provide a StateProvider implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace. The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding State implementation should be available in the class path of Storm (by placing them in the extlib directory).

Stateful windowing

The windowing implementation in storm core acknowledges the tuples in the window only when they fall out of the window. For example consider a window configuration with window length 5 mins and sliding interval 1 min. The tuples that arrived between 0 - 1 min are acked only when the window slides past 1 min i.e at the 6th minute.

1782-image00.png

If the system crashes tuples e1 to e8 gets replayed (assuming ack for e1 and e2 did not reach the acker) and w1, w2 and w3 will get re-evaluated. Stateful windowing tries to minimize the duplicate window evaluations by saving the last evaluated and last expired state of the window. Stateful windowing expects a monotonically increasing message id to be part of the tuple and uses the stateful abstractions discussed above to save the last expired and last evaluated message ids in the state. During recovery the last expired and last evaluated message ids are used to avoid duplicate window evaluations. Tuples with message id lower than last expired id are discarded and tuples with message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. The tuples beyond the last evaluated message ids are processed as usual.

State support in windowing is provided by IStatefulWindowedBolt. User bolts should typically extend BaseStatefulWindowedBolt for doing windowing operation with the framework automatically managing the state of the window.

Example Topology

An example topology in storm starter StatefulWindowingTopology demonstrates the usage of IStatefulWindowedBolt to save the state of the windowing operation to avoid re-computation in case of failures. The framework internally manages the window boundaries and does not invoke execute(TupleWindow inputWindow) for the already evaluated windows in case of restarts during failures.

Acknowledgements

Thanks to Parth Brahmbhatt, Sriharsha Chintalapani, Taylor Goetz, Robert Joseph Evans and the others in the Apache Storm community for providing valuable feedback on the design and reviewing the code. A special thanks to Parth for taking time to patiently review the blog.

20,569 Views
Comments
avatar
New Contributor

Great article!

As far as i cant tell this was not released on 0.10.0. Are u planning on including it on the next release?

avatar
Cloudera Employee

These would be part of the upcoming Apache Storm 1.0 release.

avatar
Explorer

Good article with handful explanations and pictures!

I have some questions:

1. If you are emmmiting the data on streams via collector.emit("streamId1", values), collector.emit("streamId2", values) how do you get your data in the

public void execute(TupleWindow inputWindow)? The nputWindow.get() doesn't have a parameter StreamId?

2. In the above context, does the inputWindow contain data for all the streams emmited to it?

Can you get them, via

for (Tuple tuple : tuplesInWindow) {

tuple.getValue( streamId1, 0);

}

I look forward for your answers?

Regards,

florin

avatar
Cloudera Employee

Hi Florin,

1. If you are emitting the data on streams via collector.emit("streamId1", values), collector.emit("streamId2", values) how do you get your data in the public void execute(TupleWindow inputWindow)? The InputWindow.get() doesn't have a parameter StreamId?

The windowed bolts are added similar to a regular bolt in the topology. So if you want to do the windowing with data from two different streams you can do as follows,

topologyBuilder.setBolt("windowbolt", windowBolt,1).shuffleGrouping("spout1", "stream1").shuffleGrouping("spout2", "stream2");

2. In the above context, does the inputWindow contain data for all the streams emitted to it?

Yes, it will contain data for all the streams. The tuples would contain the stream id where it came from.

for (Tuple tuple : tuplesInWindow.get()) {
  tuple.getSourceStreamId();
}

Hope it helps.

- Arun

avatar
Contributor

Great article! A question: do you know in which HDP realease will be released storm 1.0?

Thanks, Giuseppe

avatar
Master Mentor

right now there are no concrete release dates, I would wait until Hadoop Summit San Jose for any announcements.