Support Questions

Find answers, ask questions, and share your expertise

run storm topology error

avatar
Contributor

when i run the storm topology at localhost like this

public class Client {
 public static void main(String[] args) throws Exception {
  DRPCClient client = new DRPCClient("10.10.12.XX", 3772);
  String[] words = { "hello", "storm", "drpc" };
  for (String word : words) {
   String result = client.execute("exclamation", word);
   System.out.println("Result for \"" + word + "\": " + result);
  }
 }
}

I got a error

7230-qq图片20160901150753.png

my storm.yaml is

dev.zookeeper.path : '/tmp/dev-storm-zookeeper'
drpc.childopts : '-Xmx768m '
drpc.invocations.port : 3773
drpc.port : 3772
drpc.queue.size : 128
drpc.request.timeout.secs : 600
drpc.worker.threads : 64
drpc_server_host : [hdp-m1]
java.library.path : '/usr/local/lib:/opt/local/lib:/usr/lib:/usr/hdp/current/storm-client/lib'
logviewer.appender.name : 'A1'
logviewer.childopts : '-Xmx128m '
logviewer.port : 8000
metrics.reporter.register : 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter'
nimbus.childopts : '-Xmx1024m  -javaagent:/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8649,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/conf/jmxetric-conf.xml,process=Nimbus_JVM'
nimbus.cleanup.inbox.freq.secs : 600
nimbus.file.copy.expiration.secs : 600
nimbus.inbox.jar.expiration.secs : 3600
nimbus.monitor.freq.secs : 120
nimbus.reassign : true
nimbus.seeds : ['hdp-m1']
nimbus.supervisor.timeout.secs : 60
nimbus.task.launch.secs : 120
nimbus.task.timeout.secs : 30
nimbus.thrift.max_buffer_size : 1048576
nimbus.thrift.port : 6627
nimbus.topology.validator : 'backtype.storm.nimbus.DefaultTopologyValidator'
nimbus_hosts : [hdp-m1]
storm.cluster.mode : 'distributed'
storm.local.dir : '/hadoop/storm'
storm.local.mode.zmq : false
storm.log.dir : '/var/log/storm'
storm.messaging.netty.buffer_size : 5242880
storm.messaging.netty.client_worker_threads : 1
storm.messaging.netty.max_retries : 30
storm.messaging.netty.max_wait_ms : 1000
storm.messaging.netty.min_wait_ms : 100
storm.messaging.netty.server_worker_threads : 1
storm.messaging.transport : 'backtype.storm.messaging.netty.Context'
storm.thrift.transport : 'backtype.storm.security.auth.SimpleTransportPlugin'
storm.zookeeper.connection.timeout : 15000
storm.zookeeper.port : 2181
storm.zookeeper.retry.interval : 1000
storm.zookeeper.retry.intervalceiling.millis : 30000
storm.zookeeper.retry.times : 5
storm.zookeeper.root : '/storm'
storm.zookeeper.servers : ['hdp-s2','hdp-m1','hdp-s1','hdp-m2']
storm.zookeeper.session.timeout : 20000
storm_ui_server_host : [hdp-m1]
supervisor.childopts : '-Xmx256m  -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=56431 -javaagent:/usr/hdp/current/storm-supervisor/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-supervisor/contrib/storm-jmxetric/conf/jmxetric-conf.xml,process=Supervisor_JVM'
supervisor.heartbeat.frequency.secs : 5
supervisor.monitor.frequency.secs : 3
supervisor.slots.ports : [6700, 6701]
supervisor.worker.start.timeout.secs : 120
supervisor.worker.timeout.secs : 30
supervisor_hosts : [hdp-s3]
task.heartbeat.frequency.secs : 3
task.refresh.poll.secs : 10
topology.acker.executors : null
topology.builtin.metrics.bucket.size.secs : 60
topology.debug : false
topology.disruptor.wait.strategy : 'com.lmax.disruptor.BlockingWaitStrategy'
topology.enable.message.timeouts : true
topology.error.throttle.interval.secs : 10
topology.executor.receive.buffer.size : 1024
topology.executor.send.buffer.size : 1024
topology.fall.back.on.java.serialization : true
topology.kryo.factory : 'backtype.storm.serialization.DefaultKryoFactory'
topology.max.error.report.per.interval : 5
topology.max.replication.wait.time.sec : 60
topology.max.spout.pending : 1000
topology.max.task.parallelism : null
topology.message.timeout.secs : 30
topology.min.replication.count : 1
topology.optimize : true
topology.receiver.buffer.size : 8
topology.skip.missing.kryo.registrations : false
topology.sleep.spout.wait.strategy.time.ms : 1
topology.spout.wait.strategy : 'backtype.storm.spout.SleepSpoutWaitStrategy'
topology.state.synchronization.timeout.secs : 60
topology.stats.sample.rate : 0.05
topology.tick.tuple.freq.secs : null
topology.transfer.buffer.size : 1024
topology.trident.batch.emit.interval.millis : 500
topology.tuple.serializer : 'backtype.storm.serialization.types.ListDelegateSerializer'
topology.worker.childopts : null
topology.worker.shared.thread.pool.size : 4
topology.workers : 1
transactional.zookeeper.port : null
transactional.zookeeper.root : '/transactional'
transactional.zookeeper.servers : null
ui.childopts : '-Xmx768m '
ui.filter : null
ui.port : 8744
worker.childopts : '-Xmx768m  -javaagent:/usr/hdp/current/storm-client/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-client/contrib/storm-jmxetric/conf/jmxetric-conf.xml,process=Worker_%ID%_JVM'
worker.heartbeat.frequency.secs : 1
zmq.hwm : 0
zmq.linger.millis : 5000
zmq.threads : 1


and my topology is

public class BasicDRPCTopology {
 public static class ExclaimBolt extends BaseBasicBolt {
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
   String input = tuple.getString(1);
   collector.emit(new Values(tuple.getValue(0), input + "!"));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
   declarer.declare(new Fields("id", "result"));
  }
 }


 public static void main(String[] args) throws Exception {
  // Topology
  LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");

  // spout、bolt
  builder.addBolt(new ExclaimBolt(), 3);

  // Topology
  Config conf = new Config();

  if (args == null || args.length == 0) {
   LocalDRPC drpc = new LocalDRPC();

   // cluster
   LocalCluster cluster = new LocalCluster();

   // submit Topology
   cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
   for (String word : new String[] { "hello", "goodbye" }) {
    System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
   }

   // stop storm
   cluster.shutdown();
   drpc.shutdown();
  } else {
   conf.setNumWorkers(3);
   StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
  }
 }

my project works fine in storm-0.9.5.

what worng with my project? can some guy help me?

1 ACCEPTED SOLUTION

avatar
Contributor

I solved this problem.This problem was caught by supervisor. You can shutdown the supervisor or reset the supervisor.

View solution in original post

2 REPLIES 2

avatar
Contributor

both of node of nimbus and node of supervisor was configurated DRPC.

avatar
Contributor

I solved this problem.This problem was caught by supervisor. You can shutdown the supervisor or reset the supervisor.