Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Apache Storm — InvalidTopologyException — subscribes from non-existent stream

New Contributor

I'm building a topology that reads json messages from kafka container and generates alerts according to the content of the message , and then send these messages (same) and different topics.

Only I'm getting the following error, upon submitting my topology I'm getting this exception

3872[main] WARN  backtype.storm.daemon.nimbus -Topology submission exception.(topology name='alerts')#<InvalidTopologyExceptionInvalidTopologyException(msg:Component:[kafkaboltforspeed] subscribes from non-existent stream:[default] of component [speedbolt])>3876[main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory-ThreadThread[main,5,main] died
backtype.storm.generated.InvalidTopologyException:null

This is my topology:

public static void main(String[] args)throwsException{StTopology stTopology =newStTopology();String topicName ="clusteredstorm";BrokerHosts hosts =newZkHosts("172.17.0.1:2181");SpoutConfig kafkaConf1 =newSpoutConfig(hosts, topicName,"/"+ topicName, UUID.randomUUID().toString());
    kafkaConf1.scheme =newSchemeAsMultiScheme(newStringScheme());KafkaSpout kafkaSpout =newKafkaSpout(kafkaConf1);StBolt speedbolt =newStBolt();StBoltGeo geofencebolt =newStBoltGeo();TopologyBuilder builder =newTopologyBuilder();
    builder.setSpout("kafkaspout", kafkaSpout,1);
    builder.setBolt("speedbolt", speedbolt,1).shuffleGrouping("kafkaspout");
    builder.setBolt("geofencebolt", geofencebolt,1).shuffleGrouping("kafkaspout");KafkaBolt kafkaboltforspeed = stTopology.getKafkaBolt();KafkaBolt kafkaboltforgeofence = stTopology.getKafkaBolt();

    builder.setBolt("kafkaboltforspeed", kafkaboltforspeed,2).shuffleGrouping("speedbolt");
    builder.setBolt("kafkaboltforgeofence", kafkaboltforgeofence,2).shuffleGrouping("geofencebolt");Config config =newConfig();
    config.setDebug(true);
    config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,1);
    config.setNumWorkers(1);Properties props =newProperties();
    props.put("metadata.broker.list","172.17.0.1:9092");
    props.put("request.required.acks","1");
    props.put("serializer.class","kafka.serializer.StringEncoder");
    config.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);LocalCluster cluster =newLocalCluster();
    cluster.submitTopology("alerts", config, builder.createTopology());}

getKafkaBolt():

private KafkaBolt getKafkaBolt(){KafkaBolt kafkaBolt =newKafkaBolt().withTopicSelector(newKafkaTopicSelector(){publicString getTopic(Tuple tuple){JSONObject eventJson =null;Fields fields = tuple.getFields();
            eventJson =(JSONObject)JSONSerializer.toJSON((String) tuple
                    .getValueByField(fields.get(0)));return eventJson.get("customerid").toString()+"_alerts";}});// kafkaBolt.return kafkaBolt;}

StBolt.execute() :

public void execute(Tuple input,BasicOutputCollector collector){Fields fields = input.getFields();try{
        eventJson =(JSONObject)JSONSerializer.toJSON((String) input
                .getValueByField(fields.get(0)));if(Double.parseDouble((String) eventJson.get("speed"))>50.0){
           collector.emit(input);}}catch(Exception e){
        e.printStackTrace();}}

I don't know what's wrong , any tips ??

1 REPLY 1

Cloudera Employee

Seems like SpeedBolt defines non-default stream for output stream. Could you paste all code for StBolt? Especially `declareOutputFields()`.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.