Created 06-01-2016 04:54 AM
Just started with baby steps in Spark-Java. Below is a word count program that includes a stop word list that would skip words that are in the list. I have 2 accumulators to count the skipped words and unskipped words.
However, the Sysout
at the end of program always gives both accumulator values to be 0.
Please point out where I am going wrong.
public static void main(String[] args) throws FileNotFoundException { SparkConf conf = new SparkConf(); conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = jsc.textFile("hello.txt"); JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String aLine) throws Exception { return Arrays.asList(aLine.split(" ")); } }); String[] stopWordArray = getStopWordArray(); final Accumulator<Integer> skipAccumulator = jsc.accumulator(0); final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0); final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray); JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() { public Boolean call(String inString) throws Exception { boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString); if(!filterCondition){ System.out.println("Filtered a stop word "); skipAccumulator.add(1); }else{ unSkipAccumulator.add(1); } return filterCondition; } }); System.out.println("$$$$$$$$Filtered Count "+skipAccumulator.value()); System.out.println("$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value()); /* rest of code - works fine */ jsc.stop(); jsc.close(); }
Created 06-01-2016 05:16 AM
@akeezhadath it seems that you are not calling action which actually don't trigger the job. spark actions are lazily evaluted ,can you run some terminal operation on the filterwords like count or collect and see if you are able to see the incremented value of accumulators.
Created 06-01-2016 04:55 AM
I am making a runnable jar and submit the job on Hortonworks Sandbox 2.4 using spark-submit jobname.jar
Created 06-01-2016 05:00 AM
++ tried out the same in local mode on eclipse using JavaSparkContext jsc = new JavaSparkContext("local[*]","Application name"); but still the same result.
Created 06-01-2016 05:16 AM
@akeezhadath it seems that you are not calling action which actually don't trigger the job. spark actions are lazily evaluted ,can you run some terminal operation on the filterwords like count or collect and see if you are able to see the incremented value of accumulators.
Created 06-01-2016 05:16 AM
Good Point!! Let me try that @Rajkumar Singh.
Created 06-01-2016 05:19 AM
Got it, added an action first() to make it forcefully trigger. And yes, the reason that you mentioned "spark actions are lazily evaluted" was what stopped me.