I'm building a topology that reads json messages from kafka container and generates alerts according to the content of the message , and then send these messages (same) and different topics.
Only I'm getting the following error, upon submitting my topology I'm getting this exception
3872[main] WARN backtype.storm.daemon.nimbus -Topology submission exception.(topology name='alerts')#<InvalidTopologyExceptionInvalidTopologyException(msg:Component:[kafkaboltforspeed] subscribes from non-existent stream:[default] of component [speedbolt])>3876[main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory-ThreadThread[main,5,main] died
backtype.storm.generated.InvalidTopologyException:null
This is my topology:
public static void main(String[] args)throwsException{StTopology stTopology =newStTopology();String topicName ="clusteredstorm";BrokerHosts hosts =newZkHosts("172.17.0.1:2181");SpoutConfig kafkaConf1 =newSpoutConfig(hosts, topicName,"/"+ topicName, UUID.randomUUID().toString());
kafkaConf1.scheme =newSchemeAsMultiScheme(newStringScheme());KafkaSpout kafkaSpout =newKafkaSpout(kafkaConf1);StBolt speedbolt =newStBolt();StBoltGeo geofencebolt =newStBoltGeo();TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("kafkaspout", kafkaSpout,1);
builder.setBolt("speedbolt", speedbolt,1).shuffleGrouping("kafkaspout");
builder.setBolt("geofencebolt", geofencebolt,1).shuffleGrouping("kafkaspout");KafkaBolt kafkaboltforspeed = stTopology.getKafkaBolt();KafkaBolt kafkaboltforgeofence = stTopology.getKafkaBolt();
builder.setBolt("kafkaboltforspeed", kafkaboltforspeed,2).shuffleGrouping("speedbolt");
builder.setBolt("kafkaboltforgeofence", kafkaboltforgeofence,2).shuffleGrouping("geofencebolt");Config config =newConfig();
config.setDebug(true);
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,1);
config.setNumWorkers(1);Properties props =newProperties();
props.put("metadata.broker.list","172.17.0.1:9092");
props.put("request.required.acks","1");
props.put("serializer.class","kafka.serializer.StringEncoder");
config.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);LocalCluster cluster =newLocalCluster();
cluster.submitTopology("alerts", config, builder.createTopology());}
getKafkaBolt():
private KafkaBolt getKafkaBolt(){KafkaBolt kafkaBolt =newKafkaBolt().withTopicSelector(newKafkaTopicSelector(){publicString getTopic(Tuple tuple){JSONObject eventJson =null;Fields fields = tuple.getFields();
eventJson =(JSONObject)JSONSerializer.toJSON((String) tuple
.getValueByField(fields.get(0)));return eventJson.get("customerid").toString()+"_alerts";}});// kafkaBolt.return kafkaBolt;}
StBolt.execute() :
public void execute(Tuple input,BasicOutputCollector collector){Fields fields = input.getFields();try{
eventJson =(JSONObject)JSONSerializer.toJSON((String) input
.getValueByField(fields.get(0)));if(Double.parseDouble((String) eventJson.get("speed"))>50.0){
collector.emit(input);}}catch(Exception e){
e.printStackTrace();}}
I don't know what's wrong , any tips ??