Member since
02-03-2016
8
Posts
13
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
909 | 01-19-2017 03:13 AM |
01-19-2017
03:13 AM
@Jasper Trident HDFS State does provide exactly once guarantee and de-duplication is taken care of. So if a batch is replayed by trident (due to failures), the trident state implementation automatically removes duplicates from the current file by copying the data up to the last completed batch to another file. Since this operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with FileSizeRotationPolicy and at reasonable intervals with TimedRotationPolicy so that the recovery can complete within topology.message.timeout.secs.
... View more
01-17-2017
06:31 AM
1 Kudo
@Jasper right now HDFS bolt does not mark the current file as "in-progress". A reasonable solution is to use RotationAction to move the rotated files to a different directory, but its possible that if the worker crashes in the middle of a rotation, the file may not move to the destination (core and trident).
... View more
05-28-2016
02:00 AM
1 Kudo
Apache Storm is a distributed real-time computation system for processing large volumes of high-velocity data. Debugging a distributed system is inherently hard since there are many moving parts spread across a large number of hosts in a cluster. Tracing failures to a particular component or a host in the system is hard and requires one to collect and analyze a lot of logs and debug/trace processes running in the distributed cluster. In the past the support for debugging topologies in an Apache storm cluster was very minimal. Users would have to first turn on debug logs, restart the workers, redeploy the topologies and then collect logs from different worker nodes and analyze them. If they wanted to take a jstack dump or profile the workers, they would have to login to the worker hosts and run the commands manually which was painful. If one wanted to inspect the tuples flowing through the topology pipeline, they would have to add special “debug” bolts to log the tuples to some log and then remove the debug bolts from the topology before putting it back into production. In this article we will go through the new features added in Apache Storm 1.0 that makes the various aspects of debugging a storm topology much easier and user friendly. 1. Dynamic Log Levels Storm allows users and administrators to dynamically change the log level settings of a running topology both from the Storm UI and the command line. None of the Storm processes need to be restarted for the settings to take effect. Users can also specify an optional timeout after which those changes will be automatically reverted. The resulting log files are also easily searchable from the Storm UI and logviewer service. The log level settings apply the same way as you'd expect from log4j. If you set the log level of a parent logger, the child loggers start using that level (unless the children have a more restrictive level already). 1.1. Enabling via Storm UI In order to set a level, click on a running topology and then click on “Change Log Level” in the Topology Actions section. Figure 1: Changing the log levels of a topology. Next, provide the logger name, select the level you want (e.g. WARN), and a timeout in seconds (or 0 if not needed). Then click on “Add”. In the example above (Figure 1), while running a storm starter topology, the root logger is set to ERROR and storm.starter is set to DEBUG. That way one can look more specifically into the debug logs from the “storm.starter” packages while the other logs are suppressed. To clear the log level click on the “Clear” button. This reverts the log level back to what it was before you added the setting. The log level line will disappear from the UI. Figure 2: Clearing the dynamic log levels. 1.2. Using the CLI Log level can be set via the command line as follows, ./bin/storm set_log_level [topology name]-l [logger name]=[LEVEL]:[TIMEOUT] For example: ./bin/storm set_log_level my_topology -l ROOT=DEBUG:30 Sets the ROOT logger to DEBUG for 30 seconds. ./bin/storm set_log_level my_topology -r ROOT Clears the ROOT logger dynamic log level, resetting it to its original value. See JIRA STORM-412 for more details. 2. Topology Event logging Topology event inspector provides the ability to view the tuples as they flow through different stages in a storm topology. This could be useful for inspecting the tuples emitted from a spout or a bolt in the topology pipeline while the topology is running, without stopping or redeploying the topology. The normal flow of tuples from the spouts to the bolts is not affected by turning on event logging. 2.1. Enabling event logging Note: Event logging is disabled by default and needs to be enabled first by setting the storm config "topology.eventlogger.executors" to a non zero value. Please see the Configuration section for more details. Events can be logged by clicking the "Debug" button under the topology actions in the topology view. This logs the tuples from all the spouts and bolts in a topology at the specified sampling percentage. Figure 3: Enable event logging at topology level. You could also enable event logging at a specific spout or bolt level by going to the corresponding component page and clicking "Debug" under component actions. Figure 4: Enable event logging at component level. 2.2. Viewing the event logs The Storm "logviewer" should be running for viewing the logged tuples. If not already running log viewer can be started by running the "bin/storm logviewer" command from the storm installation directory. For viewing the tuples, go to the specific spout or bolt component page from storm UI and click on the "events" link under the component summary (as highlighted in Figure 4 above). This would open up a view like below where you can navigate between different pages and view the logged tuples. Figure 5: Viewing the logged events. Each line in the event log contains an entry corresponding to a tuple emitted from a specific spout/bolt in a comma separated format: Timestamp, Component name, Component task-id, MessageId (incase of anchoring), List of emitted values 2.3. Disabling the event logs Event logging can be disabled at a specific component or at the topology level by clicking the "Stop Debug" under the topology or component actions in the Storm UI. Figure 6: Disable event logging at topology level. 2.4. Configuration Event logging sends events (tuples) from each component to an internal eventlogger bolt. By default Storm does not start any event logger tasks due to a slight performance degradation when event logging is turned on. This can be changed by setting the below parameter when submitting your topology (either globally in the storm.yaml or using the command line options). Parameter Meaning When to use topology.eventlogger.executors: 0 No event logger tasks are created (default). If you don’t intend to inspect tuples and don’t want the slight performance hit. topology.eventlogger.executors: 1 One event logger task for the topology. If you want to sample a low percentage of tuples from a specific spout or a bolt. This could be the most common use case. topology.eventlogger.executors: nil One event logger task per worker. If you want to sample entire topology (all spouts and bolt) at a very high sampling percentage and the tuple rate is very high. 2.5. Extending event logging Storm provides the IEventLogger interface, which is used by the event logger bolt to log the events. The default implementation is FileBasedEventLogger, which logs the events to an events.log file ( logs/workers-artifacts/<topology-id>/ <worker-port>/events.log). Alternative implementations of the IEventLogger interface can be added to extend the event logging functionality (say build a search index or log the events into a database). /**
* EventLogger interface for logging the event info to a sink like log file or db
* for inspecting the events via UI for debugging.
*/
public interface IEventLogger {
void prepare(Map stormConf, TopologyContext context);
/**
* Invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that
* have event logging enabled.
*
* @param e the event
*/
void log(EventInfo e);
/**
* Invoked when the event logger bolt is cleaned up
*/
void close();
}
See JIRA STORM-954 for more details.
3. Distributed Log Search Another improvement to Storm's UI is the addition of distributed log search. This capability allows users to search across all log files of a specific topology, including archived logs. The search results will include matches from all supervisor nodes. Figure 7: A distributed log search output This feature is very helpful to search for patterns across workers or supervisors of a topology. Similar log search is also supported within specific worker log files via the UI. See JIRA STORM-902 for more details. 4. Dynamic Worker Profiling Users can now request worker profile data directly from Storm UI, including Heap dumps, JStack output and JProfile recordings without restarting their topologies. The generated files are then available for download for offline analysis via log viewer with various debugging tools. It is also now possible to restart workers via the Storm UI. Figure 8: Profiling and debugging workers The output can be viewed from the worker log viewer UI by switching to the appropriate file. Figure 9: Jstack output viewed from the Storm UI See JIRA STORM-1157 for more details. 5. Conclusion We covered some of the new features added in Apache Storm 1.0 that should make the various aspects of debugging a Storm topology much easier, intuitive and more user friendly. These features allow Apache Storm users to quickly troubleshoot their topologies whenever any issues are encountered.
... View more
Labels:
03-23-2016
08:20 AM
Hi Florin, 1. If you are emitting the data on streams via collector.emit("streamId1", values), collector.emit("streamId2", values) how do you get your data in the public void execute(TupleWindow inputWindow)? The InputWindow.get() doesn't have a parameter StreamId? The windowed bolts are added similar to a regular bolt in the topology. So if you want to do the windowing with data from two different streams you can do as follows, topologyBuilder.setBolt("windowbolt", windowBolt,1).shuffleGrouping("spout1", "stream1").shuffleGrouping("spout2", "stream2"); 2. In the above context, does the inputWindow contain data for all the streams emitted to it?
Yes, it will contain data for all the streams. The tuples would contain the stream id where it came from. for (Tuple tuple : tuplesInWindow.get()) {
tuple.getSourceStreamId();
} Hope it helps. - Arun
... View more
03-02-2016
07:29 AM
These would be part of the upcoming Apache Storm 1.0 release.
... View more
02-03-2016
07:54 PM
10 Kudos
Introduction Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data. In this article we explore the support for windowing operations and stateful processing that were recently added to Apache Storm. Windowing support
Windowing computations is one of the common use cases in stream processing where the unbounded stream of data is split into finite sets based on some criteria (e.g. time) and computation is applied on each group of events. An example would be to compute the top trending twitter topic in the last hour.
Windowing is primarily used for aggregations, joins, pattern matching and more. Windows can be seen as an in-memory table where events are added and evicted based on some policies.
So far Apache Storm relied on developers to built their own windowing logic. There were no recommended or high level abstractions that developers could use to define a Window in a standard way in a Topology.
Windowing support was recently added to Apache Storm. Windows can be specified with the following two parameters,
Window length - the length or duration of the window
Sliding interval - the interval at which the window slides
Storm has support for sliding and tumbling windows based on time duration and/or event count. Sliding Window
In a sliding window tuples are grouped in windows and window slides every sliding interval. For example, a time duration based sliding window of length 10 secs and sliding interval of 5 seconds is evaluated every 5 seconds.
Here the first window (w1) contains the events that arrived up to the 10th second and the second window (w2) contains the events that arrived between 5th and 15th second. Also note that events e3 - e6 are part of both windows. When window w2 is evaluated at time t = 15 secs, the events e1 and e2 are expired (dropped out of the event queue). Tumbling Window
In a tumbling window tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows. For example consider a time duration based tumbling window with length 5 secs.
Here the first window (w1) contains the events that arrived up to the 5th second, the second window (w2) contains the events that arrived between 5th and 10th second and the third window (w3) contains the events between 10th and 15th second and so on. The window is evaluated every five seconds and none of the windows overlap. Window configuration
Storm supports specifying the window length and sliding intervals as a count of the number of tuples or as a time duration or a combination of both.
The bolt interface IWindowedBolt can be implemented by bolts that needs windowing support.
public interface IWindowedBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Process tuples falling within the window and optionally emit
* new tuples based on the tuples in the input window.
*/
void execute(TupleWindow inputWindow);
void cleanup();
}
Every time the window slides (the sliding interval elapses), the execute method is invoked. The TupleWindow parameter gives access to the current tuples in the window, the tuples that expired and the new tuples that are added since last window was computed which can be used for efficient windowing computations.
Bolts that needs windowing support typically would extend
BaseWindowedBolt which has the apis for specifying the window length and sliding intervals. For example,
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
collector.emit(new Values(computedValue));
}
}
The
BaseWindowedBolt has apis to define the window parameters based on time duration, count or a combination of both. The following window configurations are supported.
/*
* Tuple count based sliding window that slides after slidingInterval number of tuples
*/
withWindow(Count windowLength, Count slidingInterval)
/*
* Tuple count based window that slides with every incoming tuple
*/
withWindow(Count windowLength)
/*
* Tuple count based sliding window that slides after slidingInterval time duration
*/
withWindow(Count windowLength, Duration slidingInterval)
/*
* Time duration based sliding window that slides after slidingInterval time duration
*/
withWindow(Duration windowLength, Duration slidingInterval)
/*
* Time duration based window that slides with every incoming tuple
*/
withWindow(Duration windowLength)
/*
* Time duration based sliding window that slides after slidingInterval number of tuples
*/
withWindow(Duration windowLength, Count slidingInterval)
/*
* Count based tumbling window that tumbles after the specified count of tuples
*/
withTumblingWindow(BaseWindowedBolt.Count count)
/*
* Time duration based tumbling window that tumbles after the specified time duration
*/
withTumblingWindow(BaseWindowedBolt.Duration duration)
The windowed bolts can be added to the topology via the
TopologyBuilder similar to the normal bolts. For example,
TopologyBuilder builder = new TopologyBuilder();
/*
* A windowed bolt that computes sum over a sliding window with window length of
* 30 events that slides after every 10 events.
*/
builder.setBolt("sum", new WindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
.shuffleGrouping("spout");
Tuple timestamp and out of order tuples
By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp. This can be useful for processing the events based on the time when the event occurred for example log entries with timestamps which are processed later.
/**
* Specify the tuple field that represents the timestamp as a long value. If this field
* is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)
The value for the
fieldName will be looked up from the incoming tuple and considered for windowing calculations. If this option is specified, all the tuples are expected to contain the timestamp field. If the field is not present in the tuple an exception will be thrown and the topology will terminate. To fix this, the offending tuple must be manually removed from the source (e.g. kafka) and topology must be restarted.
Along with the timestamp field name, a time lag parameter can also be specified which indicates the max time limit for tuples with out of order timestamps.
/**
* Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)
E.g. If the lag is 5 secs and a tuple
t1 arrived with timestamp 06:00:05 no tuples may arrive with tuple timestamp earlier than 06:00:00. If a tuple arrives with timestamp 05:59:59 after t1 and the window has moved past t1, it will be treated as a late tuple and not processed. Currently the late tuples are ignored and just logged in the worker log files at INFO level. Watermarks
For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept used by Flink and Google's MillWheel for tracking event based timestamps.
Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
/**
* Specify the watermark event generation interval. Watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)
When a watermark is received, all windows up to that timestamp will be evaluated. For example, consider tuple timestamp based processing with following window parameters,
Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
Current timestamp = 09:00:00
Tuples
e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) are received between 9:00:00 and 9:00:01.
At time t = 09:00:01, watermark w1 = 6:00:31 is emitted since no tuples earlier than 6:00:31 can arrive.
Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s).
5:59:50 - 06:00:10 with tuples e1, e2, e3
6:00:00 - 06:00:20 with tuples e1, e2, e3, e4
6:00:10 - 06:00:30 with tuples e4, e5
e6 is not evaluated since watermark timestamp 6:00:31 is lesser than the tuple ts 6:00:36.
Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) are received between 9:00:01 and 9:00:02
At time t = 09:00:02 another watermark w2 = 08:00:34 is emitted since no tuples earlier than 8:00:34 can arrive now. Three windows will be evaluated,
6:00:20 - 06:00:40 with tuples e5, e6 (from earlier batch)
6:00:30 - 06:00:50 with tuple e6 (from earlier batch)
8:00:10 - 08:00:30 with tuples e7, e8, e9
e10 is not evaluated since the tuple ts 8:00:39 is beyond the watermark time 8:00:34.
The window calculation considers the time gaps and computes the windows based on the tuple timestamp. Guarantees
The windowing functionality in storm core currently provides at-least once guarantee. The values emitted from the bolt’s
execute(TupleWindow inputWindow) method are automatically anchored to all the tuples in the inputWindow. The downstream bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree. If not the tuples will be replayed and the windowing computation will be re-evaluated.
The tuples in the window are automatically acknowledged when they fall out of the window after
windowLength + slidingInterval. Note that the configuration topology.message.timeout.secs should be more than windowLength + slidingInterval for time based windows; otherwise the tuples will timeout and get replayed and can result in duplicate evaluations. For count based windows, the configuration should be adjusted such that windowLength + slidingInterval tuples can be received within the timeout period. Example topology
An example topology in storm starter SlidingWindowTopology shows how to use the apis to compute a sliding window sum and a tumbling window average. Future improvements
Currently the windowing computations are based on count or time intervals. i.e the windows are evaluated (triggered) after a specified time interval or after a specified number of tuples arrive. Though this addresses most of the common use cases, there are a few cases where one might want to trigger the windowing computation based on some custom criteria. For example, for events tracking the mouse movement, the window could be triggered after a predefined period of inactivity (user session). Ability for users to define custom triggers and evictors can be added in future for this purpose.
Currently the windowing support is available only in core storm, this can be extended to provide windowing support in storm trident as well. Saving the window state
One of the issues with windowing is that the tuples cannot be acked until they completely fall out of the window. For instance, consider a one hour window that slides every minute. The tuples in the window is evaluated (passed to the bolt execute) every minute, but the tuples that arrived at the first minute are acked only after one hour and one minute. So if the system crashes just after one hour, storm’s acking mechanism replays all the tuples (from 0 - 60th minute) and will cause every window to be re-evaluated. That is the bolt’s execute method will be invoked again with the same set of tuples (60 times). One way to avoid this would be to save the state of the window evaluation (i.e. track the tuples that are already evaluated) in some external durable state and use this info to prune the duplicate window evaluation during recovery.
Next we will explore the storm support for state management and how it can be used to avoid duplicate window evaluations. State Management
Stateful abstractions allow storm bolts to store and retrieve the state of its computation. So far Storm support for doing stateful computations was limited to trident apis and users had to write custom logic in bolts to save the state of its computations in storm core. With the addition of state management to core storm, the framework automatically and periodically snapshots the state of the bolts across the topology in a consistent manner. There is a default in-memory based state implementation and also a Redis backed implementation that provides state persistence.
Bolts that requires its state to be managed and persisted by the framework should implement the IStatefulBolt interface or extend the BaseStatefulBolt and implement void initState(T state) method. The initState method is invoked by the framework during the bolt initialization with the previously saved state of the bolt. This is invoked after prepare but before the bolt starts processing any tuples.
Currently the only kind of State implementation that is supported is KeyValueState which provides key-value mapping.
For example a word count bolt could use the key value state abstraction for the word counts as follows.
Extend the BaseStatefulBolt and type parameterize it with KeyValueState which would store the mapping of word to count.
The bolt gets initialized with its previously saved state in the init method. This will contain the word counts last committed by the framework during the previous run.
In the execute method, update the word count.
public class WordCountBolt
extends BaseStatefulBolt> {
private KeyValueState wordCounts;
...
@Override
public void initState(KeyValueState state) {
wordCounts = state;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = wordCounts.get(word, 0);
count++;
wordCounts.put(word, count);
collector.emit(tuple, new Values(word, count));
collector.ack(tuple);
}
...
}
The framework periodically checkpoints the state of the bolt (default every second). The frequency can be changed by setting the storm config topology.state.checkpoint.interval.ms.
For state persistence, use a state provider that supports persistence by setting the topology.state.provider in the storm config. E.g. for Redis based key-value state implementation topology.state.provider can be set to org.apache.storm.redis.state.RedisKeyValueStateProvider in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the storm-redis-*.jar in the extlib directory.
The state provider properties can be overridden by setting topology.state.provider.config. For Redis state this is a json config with the following properties.
{
"keyClass": "Optional fully qualified class name of the Key type.",
"valueClass": "Optional fully qualified class name of the Value type.",
"keySerializerClass": "Optional Key serializer implementation class.",
"valueSerializerClass": "Optional Value Serializer implementation class.",
"jedisPoolConfig": {
"host": "localhost",
"port": 6379,
"timeout": 2000,
"database": 0,
"password": "xyz"
}
}
Checkpoint mechanism
Checkpoint is triggered by an internal checkpoint spout at the specified topology.state.checkpoint.interval.ms. If there is at least one IStatefulBolt in the topology, the checkpoint spout is automatically added by the topology builder . For stateful topologies, the topology builder wraps the IStatefulBolt in a StatefulBoltExecutor which handles the state commits on receiving the checkpoint tuples. The non stateful bolts are wrapped in a CheckpointTupleForwarder which just forwards the checkpoint tuples so that the checkpoint tuples can flow through the topology DAG. The checkpoint tuples flow through a separate internal stream namely $checkpoint. The topology builder wires the checkpoint stream across the whole topology with the checkpoint spout at the root.
At checkpoint intervals the checkpoint tuples are emitted by the checkpoint spout. On receiving a checkpoint tuple, the state of the bolt is saved and then the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all its input streams before it saves its state so that the state represents a consistent state across the topology. Once the checkpoint spout receives ACK from all the bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout. The checkpoint mechanism builds on top of storm's existing acking mechanism to replay the tuples and uses concepts from the asynchronous snapshot algorithm used by flink and the Chandy-Lamport algorithm for distributed snapshots.The checkpointing internally uses a three phase commit protocol with a prepare and commit phase so that the state across the topology is saved in a consistent and atomic manner. Recovery
The recovery phase is triggered when the topology is started for the first time. If the previous transaction was not successfully prepared, a rollback message is sent across the topology so that if a bolt has some prepared transactions it can be discarded. If the previous transaction was prepared successfully but not committed, a commit message is sent across the topology so that the prepared transactions can be committed. After these steps are complete, the bolts are initialized with the state.
The recovery is also triggered if one of the bolts fails to acknowledge the checkpoint message or say a worker crashed in the middle. Thus when the worker is restarted by the supervisor, the checkpoint mechanism makes sure that the bolt gets initialized with its previous state and the checkpointing continues from the point where it left off. Guarantee
Storm relies on the acking mechanism to replay tuples in case of failures. It is possible that the state is committed but the worker crashes before acking the tuples. In this case the tuples are replayed causing duplicate state updates. Also currently the StatefulBoltExecutor continues to process the tuples from a stream after it has received a checkpoint tuple on one stream while waiting for checkpoint to arrive on other input streams for saving the state. This can also cause duplicate state updates during recovery.
The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee. In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the BaseBasicBolt. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the WordCountBolt example in the State management section above. IStateful bolt hooks
IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions.
/**
* This is a hook for the component to perform some actions just before the
* framework commits its state.
*/
void preCommit(long txid);
/**
* This is a hook for the component to perform some actions just before the
* framework prepares its state.
*/
void prePrepare(long txid);
/**
* This is a hook for the component to perform some actions just before the
* framework rolls back the prepared state.
*/
void preRollback();
This is optional and stateful bolts are not expected to provide any implementation. This is provided so that other system level components can be built on top of the stateful abstractions where we might want to take some actions before the stateful bolt's state is prepared, committed or rolled back. Providing custom state implementations
Currently the only kind of State implementation supported is KeyValueState which provides key-value mapping.
Custom state implementations should provide implementations for the methods defined in the State interface. These are the void prepareCommit(long txid), void commit(long txid) and rollback() methods. commit() method is optional and is useful if the bolt manages the state on its own. This is currently used only by the internal system bolts, for e.g. the CheckpointSpout to save its state. KeyValueState implementations should also implement the methods defined in the KeyValueState interface. State provider
The framework instantiates the state via the corresponding StateProvider implementation. A custom state should also provide a StateProvider implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace. The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding State implementation should be available in the class path of Storm (by placing them in the extlib directory). Stateful windowing
The windowing implementation in storm core acknowledges the tuples in the window only when they fall out of the window. For example consider a window configuration with window length 5 mins and sliding interval 1 min. The tuples that arrived between 0 - 1 min are acked only when the window slides past 1 min i.e at the 6th minute.
If the system crashes tuples e1 to e8 gets replayed (assuming ack for e1 and e2 did not reach the acker) and w1, w2 and w3 will get re-evaluated. Stateful windowing tries to minimize the duplicate window evaluations by saving the last evaluated and last expired state of the window. Stateful windowing expects a monotonically increasing message id to be part of the tuple and uses the stateful abstractions discussed above to save the last expired and last evaluated message ids in the state. During recovery the last expired and last evaluated message ids are used to avoid duplicate window evaluations. Tuples with message id lower than last expired id are discarded and tuples with message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. The tuples beyond the last evaluated message ids are processed as usual.
State support in windowing is provided by IStatefulWindowedBolt. User bolts should typically extend BaseStatefulWindowedBolt for doing windowing operation with the framework automatically managing the state of the window. Example Topology
An example topology in storm starter StatefulWindowingTopology demonstrates the usage of IStatefulWindowedBolt to save the state of the windowing operation to avoid re-computation in case of failures. The framework internally manages the window boundaries and does not invoke execute(TupleWindow inputWindow) for the already evaluated windows in case of restarts during failures. Acknowledgements
Thanks to Parth Brahmbhatt, Sriharsha Chintalapani, Taylor Goetz, Robert Joseph Evans and the others in the Apache Storm community for providing valuable feedback on the design and reviewing the code. A special thanks to Parth for taking time to patiently review the blog.
... View more
Labels: