Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Contributor

Introduction

Trident processes a stream in batches of tuples for a defined topology. Currently, there is no windowing support of tuples in a trident stream. This section describes how windowing is supported in trident.

Below types of windows can be defined for batches of tuples based on processing time or count.

  1. Tumbling window
  2. Sliding window

Tumbling window

Tuples are grouped in a single window based on processing time or count. Any tuple belongs to only one of the windows. Below is the new API on org.apache.storm.trident.Stream.

/**Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*/

public Stream tumblingWindow(intwindowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)

/** * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration} */
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration,
WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)

Sliding window

Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window. Below is the new API on org.apache.storm.trident.Stream.

/**
Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples * and slides the window after {@code slideCount}. */


public Stream slidingWindow(intwindowCount,
intslideCount,
WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)
/** * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval} * and completes a window at {@code windowDuration} */
public Stream slidingWindow( BaseWindowedBolt.Duration windowDuration,
BaseWindowedBolt.Duration slidingInterval,
WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)

Examples of tumbling and sliding windows can be found here

Common windowing API and implementation details

Below is the common windowing API on org.apache.storm.trident.Stream which takes WindowConfig for any supported windowing configurations.

public Stream window(WindowConfig windowConfig,

Fields inputFields,

Aggregator aggregator,

Fields functionFields)

windowConfig can be any of the below.

- SlidingCountWindow of(intwindowCount, intslidingCount)

- SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration,

BaseWindowedBolt.Duration slidingDuration)

- TumblingCountWindow of(intwindowLength)

- TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength)

Trident windowing APIs need WindowsStoreFactory to store received tuples and aggregated values.

Currently, a basic implementation for HBase is given with HBaseWindowsStoreFactory and HBaseWindowsStore. It can further be extended to address other usecases.

/**
 * Factory to create instances of {@code WindowsStore}.
 */
public interface WindowsStoreFactory extends Serializable {
    public WindowsStore create();
}
/**
 * Store for storing window related entities like windowed tuples, triggers etc.
 *
 */
publicinterfaceWindowsStoreextendsSerializable {
	public Object get(String key);
	public Iterable<Object> get(List<String>keys);
	public Iterable<String> getAllKeys();
	public void put(String key, Object value);
	public void putAll(Collection<Entry>entries);
	public void remove(String key);
	public void removeAll(Collection<String>keys);
	public void shutdown();
	/**
	* This class wraps key and value objects which can be passed to {@code putAll} method.
	*/
	public static class Entry implements Serializable {
	public final String key;
	public final Object value;
...


}

Windowing operation in trident stream is a TridentProcessor implementation which has the below lifecycle for each batch of the tuples received.

// This is invoked when a new batch of tuples is received.

void startBatch(ProcessorContext processorContext);

// This is invoked for each tuple of a batch.

void execute(ProcessorContext processorContext,String streamId,TridentTuple tuple);

// This is invoked for a batch to make it complete. All the tuples of this batch would have been already invoked with execute(ProcessorContext processorContext, String streamId, TridentTuple tuple)

void finishBatch(ProcessorContext processorContext);

Each tuple is received in window operation through WindowTridentProcessor#execute ( ProcessorContext processorContext, String streamId, TridentTuple tuple) and these tuples are accumulated for each batch. When a batch is finished respective tuple information is added to the window and tuples are saved in the configured WindowsStore. Bolts for respective window operations fire a trigger according to the given windowing configuration(like tumbling/sliding count or time). These triggers would compute the aggregated result according to the given Aggregator. These results are emitted as part of the current batch if it exists. When a trigger is fired outside WindowTridentProcessor#finishBatch invocation then those triggers are stored in the given WindowsStore and emit as part of the next immediate batch from that window’s processor.

Sample application

Example of using HBaseWindowStoreFactory for windowing can be seen below.

    // window-state table should already be created with cf:tuples column 
    HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), 
                                                                               "window-state", 
                                                                               "cf".getBytes("UTF-8"), 
                                                                               "tuples".getBytes("UTF-8"));
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 
                                                3, 
                                                new Values("the cow jumped over the moon"), 
                                                new Values("the man went to the store and bought some candy"), 
                                                new Values("four score and seven years ago"), 
                                                new Values("how many apples can you eat"), 
                                                new Values("to be or not to be the person")); 
    spout.setCycle(true);
    TridentTopology topology = new TridentTopology();
    Stream stream = topology.newStream("spout1", spout)
                            .parallelismHint(16)
                            .each(new Fields("sentence"), new Split(), new Fields("word"))
                            .tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
                            .peek(new Consumer() {
                                @Overridepublicvoid
                                accept(TridentTuple input) {
                                    LOG.info("Received tuple: [{}]", input);
                                }
                            });
    StormTopology stormTopology = topology.build();

Example applications of these APIs are located at TridentHBaseWindowingStoreTopology and TridentWindowingInmemoryStoreTopology

2,192 Views
Comments
avatar
Super Collaborator

Thanks @Satish Duggana! For reference, this and related windowing articles have been incorporated into the Storm Component Guide; see http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_storm-component-guide/content/storm-windo...