Member since
09-15-2015
10
Posts
6
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1687 | 08-08-2017 01:52 PM | |
3348 | 03-22-2017 04:36 PM | |
1445 | 01-17-2017 03:19 PM |
08-08-2017
01:52 PM
1 Kudo
SAM expects messages are serialized with kafka avro serializer instead of raw avro messages in the topic. You should produce messages into kafka source's topic with KafkaAvroSerializer. You can look at KafkaAvroSerDesApp for producing and consuming messages into/from topic using schema registry serializer/deserializer.
... View more
03-22-2017
04:36 PM
1 Kudo
@Edgar Orendain StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding). It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right. Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url? Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// rootUrl is of http://<host>:<port>/api/v1
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
... View more
01-17-2017
03:19 PM
1 Kudo
You can get SchemaMetadata for a given schema name using schemaRegistryClient with the below API. SchemaMetadata schemaMetadata = getSchemaMetadataInfo(String schemaName).getSchemaMetadata()
... View more
11-16-2016
11:33 AM
3 Kudos
Introduction Trident processes a stream in batches of tuples for a defined topology. Currently, there is no windowing support of tuples in a trident stream. This section describes how windowing is supported in trident. Below types of windows can be defined for batches of tuples based on processing time or count.
Tumbling window Sliding window Tumbling window Tuples are grouped in a single window based on processing time or count. Any tuple belongs to only one of the windows. Below is the new API on org.apache.storm.trident.Stream. /**Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*/
public Stream tumblingWindow(intwindowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)
/** * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration} */
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration,
WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)
Sliding window Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window. Below is the new API on org.apache.storm.trident.Stream. /**
Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples * and slides the window after {@code slideCount}. */
public Stream slidingWindow(intwindowCount,
intslideCount,
WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)
/** * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval} * and completes a window at {@code windowDuration} */
public Stream slidingWindow( BaseWindowedBolt.Duration windowDuration,
BaseWindowedBolt.Duration slidingInterval,
WindowsStoreFactory windowStoreFactory,
Fields inputFields,
Aggregator aggregator,
Fields functionFields)
Examples of tumbling and sliding windows can be found here Common windowing API and implementation details Below is the common windowing API on org.apache.storm.trident.Stream which takes WindowConfig for any supported windowing configurations. public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) windowConfig can be any of the below. - SlidingCountWindow of(intwindowCount, intslidingCount) - SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) - TumblingCountWindow of(intwindowLength) - TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) Trident windowing APIs need WindowsStoreFactory to store received tuples and aggregated values. Currently, a basic implementation for HBase is given with HBaseWindowsStoreFactory and HBaseWindowsStore. It can further be extended to address other usecases. /**
* Factory to create instances of {@code WindowsStore}.
*/
public interface WindowsStoreFactory extends Serializable {
public WindowsStore create();
} /**
* Store for storing window related entities like windowed tuples, triggers etc.
*
*/
publicinterfaceWindowsStoreextendsSerializable {
public Object get(String key);
public Iterable<Object> get(List<String>keys);
public Iterable<String> getAllKeys();
public void put(String key, Object value);
public void putAll(Collection<Entry>entries);
public void remove(String key);
public void removeAll(Collection<String>keys);
public void shutdown();
/**
* This class wraps key and value objects which can be passed to {@code putAll} method.
*/
public static class Entry implements Serializable {
public final String key;
public final Object value;
...
}
Windowing operation in trident stream is a TridentProcessor implementation which has the below lifecycle for each batch of the tuples received. // This is invoked when a new batch of tuples is received. void startBatch(ProcessorContext processorContext); // This is invoked for each tuple of a batch. void execute(ProcessorContext processorContext,String streamId,TridentTuple tuple); // This is invoked for a batch to make it complete. All the tuples of this batch would have been already invoked with execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) void finishBatch(ProcessorContext processorContext); Each tuple is received in window operation through WindowTridentProcessor#execute ( ProcessorContext processorContext, String streamId, TridentTuple tuple) and these tuples are accumulated for each batch. When a batch is finished respective tuple information is added to the window and tuples are saved in the configured WindowsStore. Bolts for respective window operations fire a trigger according to the given windowing configuration(like tumbling/sliding count or time). These triggers would compute the aggregated result according to the given Aggregator. These results are emitted as part of the current batch if it exists. When a trigger is fired outside WindowTridentProcessor#finishBatch invocation then those triggers are stored in the given WindowsStore and emit as part of the next immediate batch from that window’s processor. Sample application Example of using HBaseWindowStoreFactory for windowing can be seen below. // window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(),
"window-state",
"cf".getBytes("UTF-8"),
"tuples".getBytes("UTF-8"));
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"),
3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"),
new Values("to be or not to be the person"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout)
.parallelismHint(16)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
.peek(new Consumer() {
@Overridepublicvoid
accept(TridentTuple input) {
LOG.info("Received tuple: [{}]", input);
}
});
StormTopology stormTopology = topology.build();
Example applications of these APIs are located at TridentHBaseWindowingStoreTopology and TridentWindowingInmemoryStoreTopology
... View more
Labels:
07-28-2016
05:55 AM
@Narendra Bidari This is about the second part of question about group-id. You can try setting a txnId value for spout with the below API which acts like a consumer group-id. This is used in
maintaining opaque transactional spout's state in ZK. Stream stream = TridentTopology#newStream(txnId, spout);
... View more
07-28-2016
05:46 AM
For more details about the conversation, you can go to mail thread here
... View more