Created 01-13-2016 01:20 PM
Hello, I am newbie to Storm...I just executed a simple Word Count program in Storm using StormSubmitter ...I had set the debugging property to "true" ....But where do I get the output of my Word Count program ?? And is there any specific that you will see the output only after terminating Storm cluster or killing the topology?? Thank You
Created 01-13-2016 01:22 PM
Created 01-13-2016 02:56 PM
@Akshay Shingote The output depends on what your Storm bolts are actually doing. If you are just printing to System out then those will be printed on console. I know some version of WordCount programs are actually writing to a file somewhere on local file system. Can you tell if the bolt is writing to sys out or a file?
Created 01-13-2016 02:58 PM
what's the name of the bolt you're using? You can go to the supervisor node, to /var/log/storm/ and tail the log for supervisor, you'd see the printout. @Akshay Shingote
Created 01-14-2016 07:18 AM
@bsaini : The bolt is just writing it to sysout & also I have enable Loggers so with sysout,I am also using Loggers (slf4j)....In the logs of my application (Storm Word Count-3-1452753765-worker-6700.log) I do see which words have been emitted but I don`t see their count anywhere.....
Created 01-14-2016 07:20 AM
@Artem Ervits : I am using 2 bolts WordNormalizer & WordCounter where WordCounter does the final job of counting the words present in input text file
Created 01-13-2016 07:13 PM
When you setDebug(true), storm will print a bunch of information to stdout which is really only useful for a very small topology you're running locally. For example, you could run your topology inside Eclipse and see the output in the Console window.
If you are deploying to a cluster, take a look at my blog post about Getting started with Storm logging. The general gist is to use SLF4J and LOG.info statements which are then visible in the Storm UI.
Created 01-14-2016 07:22 AM
This is my code for WordNormalizer Bolt public class WordNormalizer extends BaseBasicBolt { public void cleanup() {} /** * The bolt will receive the line from the * words file and process it to Normalize this line * * The normalize will be put the words in lower case * and split the line to get all words in this */ public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); collector.emit(new Values(word)); } } } /** * The bolt will only emit the field "word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
Created 01-14-2016 07:23 AM
This is my code for WordCounter Bolt public class WordCounter extends BaseBasicBolt { Integer id; String name; Map<String, Integer> counters; private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class); /** * At the end of the spout (when the cluster is shutdown * We will show the word counters */ @Override public void cleanup() { System.out.println("-- Word Counter ["+name+"-"+id+"] --"); for(Map.Entry<String, Integer> entry : counters.entrySet()){ System.out.println("Count for word :: "+entry.getKey()+ " is :: "+entry.getValue()); LOG.info("Count for "+entry.getKey() +" is :"+entry.getValue()); } } /** * On create */ @Override public void prepare(Map stormConf, TopologyContext context) { this.counters = new HashMap<String, Integer>(); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } //@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} //@Override public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString(0); /** * If the word dosn't exist in the map we will create * this, if not We will add 1 */ if(!counters.containsKey(str)){ counters.put(str, 1); }else{ Integer c = counters.get(str) + 1; counters.put(str, c); } } }
Created 01-14-2016 07:24 AM
@bsaini @Artem Ervits @Neeraj Sabharwal @Kit Menke This is my code where I am submitting my Topology....Above this I have also submitted the code for the Bolts namely WordCounter & WordNormalizer....what do I need to do in these bolts ?? Thank You
public class Program { public static void main(String[] args) { //Configuration Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader",new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),1) .fieldsGrouping("word-normalizer", new Fields("word")); try { StormSubmitter.submitTopology("Storm Word Count", conf,builder.createTopology()); } catch (AlreadyAliveException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (InvalidTopologyException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (AuthorizationException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }