Created on 09-01-2016 07:16 AM - edited 08-19-2019 02:28 AM
when i run the storm topology at localhost like this
public class Client { public static void main(String[] args) throws Exception { DRPCClient client = new DRPCClient("10.10.12.XX", 3772); String[] words = { "hello", "storm", "drpc" }; for (String word : words) { String result = client.execute("exclamation", word); System.out.println("Result for \"" + word + "\": " + result); } } }
I got a error
my storm.yaml is
dev.zookeeper.path : '/tmp/dev-storm-zookeeper' drpc.childopts : '-Xmx768m ' drpc.invocations.port : 3773 drpc.port : 3772 drpc.queue.size : 128 drpc.request.timeout.secs : 600 drpc.worker.threads : 64 drpc_server_host : [hdp-m1] java.library.path : '/usr/local/lib:/opt/local/lib:/usr/lib:/usr/hdp/current/storm-client/lib' logviewer.appender.name : 'A1' logviewer.childopts : '-Xmx128m ' logviewer.port : 8000 metrics.reporter.register : 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter' nimbus.childopts : '-Xmx1024m -javaagent:/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8649,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/conf/jmxetric-conf.xml,process=Nimbus_JVM' nimbus.cleanup.inbox.freq.secs : 600 nimbus.file.copy.expiration.secs : 600 nimbus.inbox.jar.expiration.secs : 3600 nimbus.monitor.freq.secs : 120 nimbus.reassign : true nimbus.seeds : ['hdp-m1'] nimbus.supervisor.timeout.secs : 60 nimbus.task.launch.secs : 120 nimbus.task.timeout.secs : 30 nimbus.thrift.max_buffer_size : 1048576 nimbus.thrift.port : 6627 nimbus.topology.validator : 'backtype.storm.nimbus.DefaultTopologyValidator' nimbus_hosts : [hdp-m1] storm.cluster.mode : 'distributed' storm.local.dir : '/hadoop/storm' storm.local.mode.zmq : false storm.log.dir : '/var/log/storm' storm.messaging.netty.buffer_size : 5242880 storm.messaging.netty.client_worker_threads : 1 storm.messaging.netty.max_retries : 30 storm.messaging.netty.max_wait_ms : 1000 storm.messaging.netty.min_wait_ms : 100 storm.messaging.netty.server_worker_threads : 1 storm.messaging.transport : 'backtype.storm.messaging.netty.Context' storm.thrift.transport : 'backtype.storm.security.auth.SimpleTransportPlugin' storm.zookeeper.connection.timeout : 15000 storm.zookeeper.port : 2181 storm.zookeeper.retry.interval : 1000 storm.zookeeper.retry.intervalceiling.millis : 30000 storm.zookeeper.retry.times : 5 storm.zookeeper.root : '/storm' storm.zookeeper.servers : ['hdp-s2','hdp-m1','hdp-s1','hdp-m2'] storm.zookeeper.session.timeout : 20000 storm_ui_server_host : [hdp-m1] supervisor.childopts : '-Xmx256m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=56431 -javaagent:/usr/hdp/current/storm-supervisor/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-supervisor/contrib/storm-jmxetric/conf/jmxetric-conf.xml,process=Supervisor_JVM' supervisor.heartbeat.frequency.secs : 5 supervisor.monitor.frequency.secs : 3 supervisor.slots.ports : [6700, 6701] supervisor.worker.start.timeout.secs : 120 supervisor.worker.timeout.secs : 30 supervisor_hosts : [hdp-s3] task.heartbeat.frequency.secs : 3 task.refresh.poll.secs : 10 topology.acker.executors : null topology.builtin.metrics.bucket.size.secs : 60 topology.debug : false topology.disruptor.wait.strategy : 'com.lmax.disruptor.BlockingWaitStrategy' topology.enable.message.timeouts : true topology.error.throttle.interval.secs : 10 topology.executor.receive.buffer.size : 1024 topology.executor.send.buffer.size : 1024 topology.fall.back.on.java.serialization : true topology.kryo.factory : 'backtype.storm.serialization.DefaultKryoFactory' topology.max.error.report.per.interval : 5 topology.max.replication.wait.time.sec : 60 topology.max.spout.pending : 1000 topology.max.task.parallelism : null topology.message.timeout.secs : 30 topology.min.replication.count : 1 topology.optimize : true topology.receiver.buffer.size : 8 topology.skip.missing.kryo.registrations : false topology.sleep.spout.wait.strategy.time.ms : 1 topology.spout.wait.strategy : 'backtype.storm.spout.SleepSpoutWaitStrategy' topology.state.synchronization.timeout.secs : 60 topology.stats.sample.rate : 0.05 topology.tick.tuple.freq.secs : null topology.transfer.buffer.size : 1024 topology.trident.batch.emit.interval.millis : 500 topology.tuple.serializer : 'backtype.storm.serialization.types.ListDelegateSerializer' topology.worker.childopts : null topology.worker.shared.thread.pool.size : 4 topology.workers : 1 transactional.zookeeper.port : null transactional.zookeeper.root : '/transactional' transactional.zookeeper.servers : null ui.childopts : '-Xmx768m ' ui.filter : null ui.port : 8744 worker.childopts : '-Xmx768m -javaagent:/usr/hdp/current/storm-client/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-client/contrib/storm-jmxetric/conf/jmxetric-conf.xml,process=Worker_%ID%_JVM' worker.heartbeat.frequency.secs : 1 zmq.hwm : 0 zmq.linger.millis : 5000 zmq.threads : 1
and my topology is
public class BasicDRPCTopology { public static class ExclaimBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { // Topology LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); // spout、bolt builder.addBolt(new ExclaimBolt(), 3); // Topology Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); // cluster LocalCluster cluster = new LocalCluster(); // submit Topology cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[] { "hello", "goodbye" }) { System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); } // stop storm cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } }
my project works fine in storm-0.9.5.
what worng with my project? can some guy help me?
Created 09-08-2016 06:38 AM
I solved this problem.This problem was caught by supervisor. You can shutdown the supervisor or reset the supervisor.
Created 09-01-2016 07:23 AM
both of node of nimbus and node of supervisor was configurated DRPC.
Created 09-08-2016 06:38 AM
I solved this problem.This problem was caught by supervisor. You can shutdown the supervisor or reset the supervisor.