Introduction

Hadoop MapReduce is a software framework and programming model for processing large datasets. Programs written in MapReduce style may be able to automatically parallized and executed on a large cluster of computing machine. With the abstraction provided, users could express simple calculation, hide the messy details of parallelization and fault-tolerance. There are mainly six stages: split input, master initialization, start mappers, network transfer, start reducers, reduces ouput, and master cleanup.
  1. Split input: the input file would be splited in to M pieces of files (M is the number of mappers and the size of the splited files is typically 16 to 64 MB while corresponding to the data block size in Hadoop file system)
  2. Start mappers: the worker machine who is assigned a map task reads the splited input file, gets the input key/value pairs, passes the pairs to the user-defiend map function, and generates the intermediate key/value pairs.
  3. Network transfer: The intermedia key/value pairs is stored to the local disk on the worker machine and partitioned into R parts (R is the number of reducers).
  4. Start reducers: When all tasks assigned to the mappers are finished. A reducer worker would be notified by the master where the intermediate key/value pairs stores, use remote procedure calls to read the data from the local diskof the map workers, and sort all the pairs by the intermediate keys.
  5. Reduces output: The reducers would iterate all the intermediate pairs and pass the key and the corresponding iteratable of intermediate values to the reduce function. The result is appended to a file in the global file system.
  6. Master cleanup: When all tasks are finished, the master wakes up the user program and the control is back to the user code.

Problem

Here is a friend list. The alphabet on the left of the comma is the user and the list on the right of the comma is the friend of that user. How to use MapReduce to find the common friend?
A : B C D
B : A C D E
C : A B D E
D : A B C E
E : B C D

Illustrative flow

For each user, iterate the friends he/she has and generate the key of himself/herself and each friend he/she has. Note that the key here must be sorted or else the result can't be obtained since the reduce would be processed on the exactly same key.
A : B C D
 map =>
(A, B) => B C D
(A, C) => B C D
(A, D) => B C D
In the reducing phase, each intermediate key would get exactly two values since only the two user in that key could generate the values. Compare the two values, that is their friend list, and leave the repeated.
(A, B) => [B C D, A C D E]
 reduce =>
(A, B) => C D

Mapper

The input key for the mappers is of type LongWritable which indicating the line number of the input for each mapper. And the input value is of type String indicating the content for each mapper. Here, we first split the input value into different lines of user. And then iterate all the friend in the user's friend list. The output collects intermediate key/value pairs for each user and his/her friend.
StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\n");
String line = null;
String[] lineArray = null;
String[] friendArray = null;
String[] tempArray = null;
while(tokenizer.hasMoreTokens()){
        line = tokenizer.nextToken();
        lineArray = line.split(" : ");
        friendArray = lineArray[1].split(" ");
        tempArray = new String[2];
        for(int i = 0; i < friendArray.length; i++){
                tempArray[0] = friendArray[i];
                tempArray[1] = lineArray[0];
                Arrays.sort(tempArray);
                output.collect(new Text(tempArray[0] + " " + tempArray[1]), new Text(lineArray[1]));
        }
}

Reducer

With each intermediate key, there are only two values would be collected from the two user in the intermediate key. Note that the same key and value objects are used on each invocation of the next() function so the text object would need a copy constructer to maintain the first value.
public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text>{
        public void reduce(Text key, Iterator<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter) throws IOException{
                Text[] texts = new Text[2];
                int index = 0;
                while(values.hasNext()){
                        texts[index++] = new Text(values.next());
                }
                String[] list1 = texts[0].toString().split(" ");
                String[] list2 = texts[1].toString().split(" ");
                List<String> list = new LinkedList<String>();
                for(String friend1 : list1){
                        for(String friend2 : list2){
                                if(friend1.equals(friend2)){
                                        list.add(friend1);
                                }
                        }
                }
                StringBuffer sb = new StringBuffer();
                for(int i = 0; i < list.size(); i++){
                        sb.append(list.get(i));
                        if(i != list.size() - 1)
                                sb.append(" ");
                }
                output.collect(key, new Text(sb.toString()));
        }
}

Conclusion

After the process above, the common list would be produced. Here is the output example and the code is listed below. But there is still one problem. If one user isn't a friend of another user, then the common of these two people wouldn't be generated. To implement this feature, then the mapper would have to obtain a global knowledge of all the user. How to implement this is leave to you guys.
A B     C D
A C     B D
A D     B C
B C     A D E
B D     A C E
B E     C D
C D     A B E
C E     B D
D E     B C
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
 
public class Friend {
 
        public static class Map extends MapReduceBase
                implements Mapper<LongWritable, Text, Text, Text>{
                public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
                        throws IOException{
                        StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\n");
                        String line = null;
                        String[] lineArray = null;
                        String[] friendArray = null;
                        String[] tempArray = null;
                        while(tokenizer.hasMoreTokens()){
                                line = tokenizer.nextToken();
                                lineArray = line.split(" : ");
                                friendArray = lineArray[1].split(" ");
                                tempArray = new String[2];
                                for(int i = 0; i < friendArray.length; i++){
                                        tempArray[0] = friendArray[i];
                                        tempArray[1] = lineArray[0];
                                        Arrays.sort(tempArray);
                                        output.collect(new Text(tempArray[0] + " " + tempArray[1]), new Text(lineArray[1]));
                                }
                        }
                }
        }
 
        public static class Reduce extends MapReduceBase
                implements Reducer<Text, Text, Text, Text>{
                public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) throws IOException{
                        Text[] texts = new Text[2];
                        int index = 0;
                        while(values.hasNext()){
                                texts[index++] = new Text(values.next());
                        }
                        String[] list1 = texts[0].toString().split(" ");
                        String[] list2 = texts[1].toString().split(" ");
                        List<String> list = new LinkedList<String>();
                        for(String friend1 : list1){
                                for(String friend2 : list2){
                                        if(friend1.equals(friend2)){
                                                list.add(friend1);
                                        }
                                }
                        }
                        StringBuffer sb = new StringBuffer();
                        for(int i = 0; i < list.size(); i++){
                                sb.append(list.get(i));
                                if(i != list.size() - 1)
                                        sb.append(" ");
                        }
                        output.collect(key, new Text(sb.toString()));
                }
        }
 
        public static void main(String[] args) throws Exception{
                JobConf conf = new JobConf(Friend.class);
                conf.setJobName("Friend");
 
                conf.setMapperClass(Map.class);
                conf.setReducerClass(Reduce.class);
 
                conf.setMapOutputKeyClass(Text.class);
                conf.setMapOutputValueClass(Text.class);
 
                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(Text.class);
 
                FileInputFormat.setInputPaths(conf, new Path(args[0]));
                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
 
                JobClient.runJob(conf);
        }
}

History

Reference

  1. Map reduce tutorial
  2. Hadoop Reducer Values in Memory?