- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Spark Java Accumulator not incrementing
- Labels:
-
Apache Spark
Created ‎06-01-2016 04:54 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Just started with baby steps in Spark-Java. Below is a word count program that includes a stop word list that would skip words that are in the list. I have 2 accumulators to count the skipped words and unskipped words.
However, the Sysout
at the end of program always gives both accumulator values to be 0.
Please point out where I am going wrong.
public static void main(String[] args) throws FileNotFoundException { SparkConf conf = new SparkConf(); conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = jsc.textFile("hello.txt"); JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String aLine) throws Exception { return Arrays.asList(aLine.split(" ")); } }); String[] stopWordArray = getStopWordArray(); final Accumulator<Integer> skipAccumulator = jsc.accumulator(0); final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0); final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray); JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() { public Boolean call(String inString) throws Exception { boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString); if(!filterCondition){ System.out.println("Filtered a stop word "); skipAccumulator.add(1); }else{ unSkipAccumulator.add(1); } return filterCondition; } }); System.out.println("$$$$$$$$Filtered Count "+skipAccumulator.value()); System.out.println("$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value()); /* rest of code - works fine */ jsc.stop(); jsc.close(); }
Created ‎06-01-2016 05:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@akeezhadath it seems that you are not calling action which actually don't trigger the job. spark actions are lazily evaluted ,can you run some terminal operation on the filterwords like count or collect and see if you are able to see the incremented value of accumulators.
Created ‎06-01-2016 04:55 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am making a runnable jar and submit the job on Hortonworks Sandbox 2.4 using spark-submit jobname.jar
Created ‎06-01-2016 05:00 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
++ tried out the same in local mode on eclipse using JavaSparkContext jsc = new JavaSparkContext("local[*]","Application name"); but still the same result.
Created ‎06-01-2016 05:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@akeezhadath it seems that you are not calling action which actually don't trigger the job. spark actions are lazily evaluted ,can you run some terminal operation on the filterwords like count or collect and see if you are able to see the incremented value of accumulators.
Created ‎06-01-2016 05:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Good Point!! Let me try that @Rajkumar Singh.
Created ‎06-01-2016 05:19 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Got it, added an action first() to make it forcefully trigger. And yes, the reason that you mentioned "spark actions are lazily evaluted" was what stopped me.
