Created 09-29-2015 08:22 PM
Created 09-29-2015 09:52 PM
Here is a simple topology https://github.com/hortonworks/storm/tree/2.3-maint/external/storm-kafka-example
that uses OpaqueTridentSpout and here is the one with Trident
Adding another KafkaSpout is same as the one show in first link you can declare another KafkaSpout add it to your topology.
SpoutConfig spoutCfg1 = new SpoutConfig(zkHosts, topic1, zkRoot, zkSpoutId1); KafkaSpout kafkaSpout1 = new KafkaSpout(spoutCfg1); SpoutConfig spoutCfg2 = new SpoutConfig(zkHosts, topic2, zkRoot, zkSpoutId2); KafkaSpout kafkaSpout2 = new KafkaSpout(spoutCfg2); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout1", kafkaSpout1, 4); builder.setSpout("kafka-spout2", kafkaSpout2, 4);
Created 09-29-2015 08:30 PM
What do you mean by multiple Kafka spouts? When you configure a spout to consume from a topic depending on what parallelism hint (ideally = number of kafka partitions) and what number of workers you set that single spout instance will be executed in different worker JVM's each one consuming from a kafka partition. You can find a trident example here https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/trident/Tri...
Created 09-30-2015 01:50 PM
Yes, to be clear I meant having multiple instances of a Kafka spout reading from the multiple partitions of a single topic. As long as the parallelism hint for the KafkaSpout matches the number of partitions, this is handled automatically?
Created 10-01-2015 02:31 AM
No you cannot share the same topic among multiple topics. if you have parallelism lower than the topic partitions each executor of kafka spout will get multiple partitions to read from. Any reason you are looking to do this.
Created 10-01-2015 10:16 PM
My question was unclear. I have a topic with 4 partitions, I wanted to know how to wire up a Spout to read from all partitions simultaneously. I now know that if I set my spout parallelism to match the number of partitions, it accomplishes this automatically. I had assumed incorrectly that more configuration was required to achieve this.
Created 09-29-2015 08:35 PM
The Storm topology which uses one spout should be nearly identical to the topology which uses multiple spouts. In order to increase Kafka Spout from one to many instances simply increase the "parallelism hint" for the Kafka Spout. This can be done in Java code when the topology is defined OR via configuration passed into the Topology (to be defined at deployment time)
When you create the Kafka topic you will specify the overall number of partitions. You will want to set your Kafka Spout parallelism hint to this same number.
Created 09-22-2016 04:27 PM
Obviously it is on me to test it out 🙂 BUT... any initial thought of what happens when you have a smaller number of spout instances that the number of partitions for the kafka topic? Clearly, the spout instances either double (or triple or more) down on which partitions it is taking care of, or, we just don't consume the messages on the partitions that we don't have a spout instance for.
Created 09-22-2016 04:30 PM
Doh! @Sriharsha Chintalapani answer the questions in the comments section of another answer tells me "if you have parallelism lower than the topic partitions each executor of kafka spout will get multiple partitions to read from". Good stuff.
Created 09-29-2015 09:52 PM
Here is a simple topology https://github.com/hortonworks/storm/tree/2.3-maint/external/storm-kafka-example
that uses OpaqueTridentSpout and here is the one with Trident
Adding another KafkaSpout is same as the one show in first link you can declare another KafkaSpout add it to your topology.
SpoutConfig spoutCfg1 = new SpoutConfig(zkHosts, topic1, zkRoot, zkSpoutId1); KafkaSpout kafkaSpout1 = new KafkaSpout(spoutCfg1); SpoutConfig spoutCfg2 = new SpoutConfig(zkHosts, topic2, zkRoot, zkSpoutId2); KafkaSpout kafkaSpout2 = new KafkaSpout(spoutCfg2); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout1", kafkaSpout1, 4); builder.setSpout("kafka-spout2", kafkaSpout2, 4);