Support Questions

Find answers, ask questions, and share your expertise

Is there any method like awaitTermination(spark ) in storm ?

avatar
Contributor

as there is awaitTermination in spark is there any other method in storm .

Any help would be appreciated.

Thanks.

1 ACCEPTED SOLUTION

avatar
Super Guru

can you brief me about your usecase here.in spark awaitTermination is to stop steamingContext while allowing the executors to complete the processing.in spark application you do awaitTermination so that main thread can block until executors are done. in storm we have kill topology which you can call from ui as well as cli to end the topology

View solution in original post

7 REPLIES 7

avatar
Super Guru

can you brief me about your usecase here.in spark awaitTermination is to stop steamingContext while allowing the executors to complete the processing.in spark application you do awaitTermination so that main thread can block until executors are done. in storm we have kill topology which you can call from ui as well as cli to end the topology

avatar
Contributor

Thanks for reply .

we are reading messages from kafka and trying to put it in hbase table.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("message-spout", kafkaSpout);

builder.setBolt("message-bolt", new ReadTopicBolt()).shuffleGrouping("message-spout");

builder.setBolt("hbase-bolt", new HbasePutBolt2()).shuffleGrouping("message-bolt");

cluster.submitTopology("KafkaStorm", config, builder.createTopology());

Thread.sleep(100000);

but when message are in excess we wanna increase the sleep time everytime.

we used to use "awaitTermination" in spark.

but for STORM is their same functionality which we can use.

i.e for processing the spout and bolt completely and then to shutdown the topology.

avatar
Super Guru

you are running topology in local cluster mode better to run it this using StormSubmitter which will create topology and submit it to the remote cluster

StormSubmitter.submitTopology("topology_name", config, builder.createTopology());

to understand how localcluster and stormsubmitter work refer the main method https://github.com/rajkrrsingh/StormSampleApp/blob/master/src/main/java/com/rajkrrsingh/storm/Sample...

avatar
Contributor

Thanks for reply.

we wanna run it on local cluster only

avatar
Super Guru

localcluster dont have any such feature as topologies never terminate by own so you need to rely on thread.sleep

avatar
Contributor

Thanks a lot

avatar
Rising Star

You can kill a storm topology as shown below. Use set_wait_secs to set some buffer time so messages already in topology are completely processed before topology is killed. Its equivalent to -w option using storm kill CLI command.

Map conf = Utils.readStormConfig();
Client client = NimbusClient.getConfiguredClient(conf).getClient();
KillOptions killOpts = new KillOptions();
killOpts.set_wait_secs(waitSeconds); // time to wait before killing
client.killTopologyWithOpts(topology_name, killOpts); //provide topology name

I'm not sure if there is any direct way to achieve what you want without 1) changing this value for every run or 2) set this to a very high value (like 10 min) so that its guarantees that all messages are processed before killing topology. Please keep in mind that the main use case of storm is to do continuous computation on data by having your topologies running for ever.