Support Questions

Find answers, ask questions, and share your expertise

Where can I find a good example of a Storm topology that uses multiple Kafka spouts taking advantage of Kafka partitions?

avatar
Rising Star
 
1 ACCEPTED SOLUTION

avatar

Here is a simple topology https://github.com/hortonworks/storm/tree/2.3-maint/external/storm-kafka-example

that uses OpaqueTridentSpout and here is the one with Trident

https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/trident/Tri...

Adding another KafkaSpout is same as the one show in first link you can declare another KafkaSpout add it to your topology.

SpoutConfig spoutCfg1 = new SpoutConfig(zkHosts, topic1, zkRoot, zkSpoutId1);    KafkaSpout kafkaSpout1 = new KafkaSpout(spoutCfg1);

SpoutConfig spoutCfg2 = new SpoutConfig(zkHosts, topic2, zkRoot, zkSpoutId2);    KafkaSpout kafkaSpout2 = new KafkaSpout(spoutCfg2);

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("kafka-spout1", kafkaSpout1, 4);

builder.setSpout("kafka-spout2", kafkaSpout2, 4);


View solution in original post

8 REPLIES 8

avatar
Explorer

What do you mean by multiple Kafka spouts? When you configure a spout to consume from a topic depending on what parallelism hint (ideally = number of kafka partitions) and what number of workers you set that single spout instance will be executed in different worker JVM's each one consuming from a kafka partition. You can find a trident example here https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/trident/Tri...

avatar
Rising Star

Yes, to be clear I meant having multiple instances of a Kafka spout reading from the multiple partitions of a single topic. As long as the parallelism hint for the KafkaSpout matches the number of partitions, this is handled automatically?

avatar

No you cannot share the same topic among multiple topics. if you have parallelism lower than the topic partitions each executor of kafka spout will get multiple partitions to read from. Any reason you are looking to do this.

avatar
Rising Star

My question was unclear. I have a topic with 4 partitions, I wanted to know how to wire up a Spout to read from all partitions simultaneously. I now know that if I set my spout parallelism to match the number of partitions, it accomplishes this automatically. I had assumed incorrectly that more configuration was required to achieve this.

avatar
Expert Contributor

The Storm topology which uses one spout should be nearly identical to the topology which uses multiple spouts. In order to increase Kafka Spout from one to many instances simply increase the "parallelism hint" for the Kafka Spout. This can be done in Java code when the topology is defined OR via configuration passed into the Topology (to be defined at deployment time)

When you create the Kafka topic you will specify the overall number of partitions. You will want to set your Kafka Spout parallelism hint to this same number.

Setting parallelism hint

Discussion on overloading partitions and spouts

avatar

Obviously it is on me to test it out 🙂 BUT... any initial thought of what happens when you have a smaller number of spout instances that the number of partitions for the kafka topic? Clearly, the spout instances either double (or triple or more) down on which partitions it is taking care of, or, we just don't consume the messages on the partitions that we don't have a spout instance for.

avatar

Doh! @Sriharsha Chintalapani answer the questions in the comments section of another answer tells me "if you have parallelism lower than the topic partitions each executor of kafka spout will get multiple partitions to read from". Good stuff.

avatar

Here is a simple topology https://github.com/hortonworks/storm/tree/2.3-maint/external/storm-kafka-example

that uses OpaqueTridentSpout and here is the one with Trident

https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/trident/Tri...

Adding another KafkaSpout is same as the one show in first link you can declare another KafkaSpout add it to your topology.

SpoutConfig spoutCfg1 = new SpoutConfig(zkHosts, topic1, zkRoot, zkSpoutId1);    KafkaSpout kafkaSpout1 = new KafkaSpout(spoutCfg1);

SpoutConfig spoutCfg2 = new SpoutConfig(zkHosts, topic2, zkRoot, zkSpoutId2);    KafkaSpout kafkaSpout2 = new KafkaSpout(spoutCfg2);

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("kafka-spout1", kafkaSpout1, 4);

builder.setSpout("kafka-spout2", kafkaSpout2, 4);