Created on 11-02-2017 11:33 AM
Trying to Understand Spark from Map Reduce Perspective .
Lets take Word Count Example from Map Reduce and Spark and try to realize whats happening .
MapReduce Word Count :
Mapper:
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // As TextInput Format has been used , // the key is the offset of line in the file , The actual line goes in the value // Reuse the writables, to avoid GC. private final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { //Writable.Set() replaces the previous content of // the writable Object with the new content word.set(tokenizer.nextToken()); context.write(word, one); } } }
Reducer :
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } }
Spark Word Count
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath); stringJavaRDD .flatMap(line -> Arrays.asList(line.split(" "))) // New Tuple is being formed for every row the in the input .mapToPair(word -> new Tuple2<String, Integer>(word, 1)) // The reduceByKey Api => only take the values , key is not fed in the api // Before reduceByKey results into a shuffle hence breaking the DAG into stages .reduceByKey((a,b)->a+b) //.reduceByKey((count1, count2) -> count1 + count2) // How many partitions to slpit the output into .repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1)) .saveAsTextFile(outputPath);
InputFormat = TextInputFormat | InputFormat = TextInputFormat. (Input and output Format comes from hadoop hence same) |
Mappers : Read data from Split job.setMapperClass(WordCountMapper.class); job.setInputFormatClass(TextInputFormat.class); | Read data from Split. Based on InputFormat used , spawned mapTask = Number of Splits. JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath); |
In Mapper You Get Key and Value. Key = Line Offset , Value = Lines | In Mapper Task you get only values as there is no concept keys in Spark. One gets only Values i.e the whole line known as JavaRDD. Do remember Its the same TextInputFormat and LineRecordReader. Spark just takes the value from the RecordReader |
Inside Mapper , Each Line is split into word. StringTokenizer tokenizer = new StringTokenizer(line); | flatMap(line -> Arrays.asList(line.split(” “))) FlatMap is just a transformation thats being applied to each input line. Its just like writing a UDF in pig/Hive , for every Row this function is called. A transformation can be attached to either Map Task or Reduce Task or other transformation task . Transformation are just chaining of functions. Input line => function1 => function2 => function3 …. |
In Mapper for Each Word , attach 1 as the value . context.write(word, one); | Again chain another transformation to get another transformed value JavaRDD => FlatMap => mapToPair mapToPair(word -> new Tuple2<String, Integer>(word, 1)). This generates Key, Value like Mapper in Map Reduce If You Look at Spark DAG all these transformation are happening inside the same Mapper Task |
On Reducer You Collect for a Word , all the counts . for (IntWritable value : values) { | reduceByKey((a,b)->a+b) Just Like Map Reduce you do Reduce , In Spark the key is not part of reduce function . Just like Map Reduce reduceByKey results into shuffle and ReduceTask is executed . |
Num Reducer is set in the driver . job.setNumReduceTasks() | repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1)) |
write to HDFS | write to HDFS |
Central Object which holds all info about the job Context context | JavaSparkContext sparkContext = new JavaSparkContext(sparkConf) |
OutPut FileFormat = TextOutputFormat | OutPut FileFormat = TextOutputFormat |
We have Mapper And Reducer running to perform word count | The Spark code is scanned and translated to Task (Mapper and Reducer) |
We have a separate Driver, Mapper, Reducer code in Map Reduce | In Case Of Spark the driver + Task (Mp Reduce) are part of same code. |
In Terms of Map Reduce we have Mapper and Reducer. Reducer leads to shuffle | In Terms of spark Whenever a shuffle happen , a stage is formed . |