Support Questions

Find answers, ask questions, and share your expertise

Spark Java Accumulator not incrementing

avatar
Super Collaborator

Just started with baby steps in Spark-Java. Below is a word count program that includes a stop word list that would skip words that are in the list. I have 2 accumulators to count the skipped words and unskipped words.

However, the Sysout at the end of program always gives both accumulator values to be 0.

Please point out where I am going wrong.

public static void main(String[] args) throws FileNotFoundException {
        SparkConf conf = new SparkConf();
        conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> fileRDD = jsc.textFile("hello.txt");
        JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String aLine) throws Exception {
                return Arrays.asList(aLine.split(" "));
            }
        });
        String[] stopWordArray = getStopWordArray();
         final Accumulator<Integer> skipAccumulator = jsc.accumulator(0);
         final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0);
        final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray);
        JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() {
            public Boolean call(String inString) throws Exception {
                boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString);
                if(!filterCondition){
                    System.out.println("Filtered a stop word ");
                    skipAccumulator.add(1);
                }else{
                    unSkipAccumulator.add(1);
                }
                return filterCondition;
            }
        });
        System.out.println("$$$$$$$$Filtered Count "+skipAccumulator.value());
        System.out.println("$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value());
        /* rest of code - works fine */
        jsc.stop();
        jsc.close();
        }
1 ACCEPTED SOLUTION

avatar
Super Guru

@akeezhadath it seems that you are not calling action which actually don't trigger the job. spark actions are lazily evaluted ,can you run some terminal operation on the filterwords like count or collect and see if you are able to see the incremented value of accumulators.

View solution in original post

5 REPLIES 5

avatar
Super Collaborator

I am making a runnable jar and submit the job on Hortonworks Sandbox 2.4 using spark-submit jobname.jar

avatar
Super Collaborator
++ tried out the same in local mode on eclipse using JavaSparkContext jsc = new JavaSparkContext("local[*]","Application name"); but still the same result. 

avatar
Super Guru

@akeezhadath it seems that you are not calling action which actually don't trigger the job. spark actions are lazily evaluted ,can you run some terminal operation on the filterwords like count or collect and see if you are able to see the incremented value of accumulators.

avatar
Super Collaborator

Good Point!! Let me try that @Rajkumar Singh.

avatar
Super Collaborator

Got it, added an action first() to make it forcefully trigger. And yes, the reason that you mentioned "spark actions are lazily evaluted" was what stopped me.