Cloud Computing Labs: MapReduce Programming

The primary purpose of this assignment is to familiarize you with Hadoop and MapReduce programming. You will be asked to work through a few tutorials. The first part of the assignment does not involve actual coding, but requires a lot of activities on the command line (running Hadoop jobs, copying files around, etc.). In the second part, you'll develop a MapReduce version of kmeans clustering algorithm.

Part 1: Getting Familiar with Hadoop

For this project, you can use the Hadoop system in nimbus17.cs.wright.edu or your installation in your own linux box. We have created accounts for you. If you cannot login, please let me know. After you login, do the following:

Now, answer the following questions:

Question 1.1 Please copy the file /usr/local/hadoop/src/examples/org/apache/hadoop/examples/WordCount.java to your own directory and follow the commands in the MapReduce tutorial to compile and run the program. You can use the existing HDFS file /user/hadoop/text or upload any text file for testing

hadoop fs -put your_local_file your_HDFS_file
In addition to the hadoop core jar library (for the version installed on nimbus17) /usr/local/hadoop/hadoop-core-1.0.3.jar, make sure that you also include /usr/local/hadoop/lib/commons-cli-1.2.jar in your classpath when you compile the code. The paths to these library files may vary according to the Hadoop version you install. Have you successfully run the WordCount program on /user/hadoop/text? (yes or no)

Look at the output file part-r-00000 in your output directory with the command:

hadoop fs -cat your_output_directory/part-r-00000 | less

Question 1.2 What's the next term found in the collection after measured? How many times does it appear?

Question 1.3 How many hours approximately did you spend on this task?

Question 1.4 How useful is this task to your understanding of the Hadoop environment?


Part 2: MapReduce KMeans Algorithm

In this task, you will need to implement a MapReduce version of the KMeans Algorithm. We have discussed the KMeans algorithm in the class. Briefly, the KMeans algorithm has two steps in each iteration: (1) determine the cluster label for each data point with the current centroids and (2) re-calculate the centroids based on the updated cluster labels. You can find a detailed description of KMeans on wikipedia. In this project, first, you should review the standalone Java implementation of KMeans, and test it with the sample data.csv. There is the CSVHelper.java file you can reuse in your MapReduce implementation. Then, you will take a step by step approach to move the program to the MapReduce framework and learn the following key components: the template parameter setting, the Map function, the Reduce function, the global parameter setting, passing parameters to Map/Reduce functions, and iteratively running MapReduce jobs.

You can compile and run the KMeans program with the following commandline.

javac *.java
java KMeans

After you review the KMeans Java implementation, please answer the following questions:

Question 2.1 What is a centroid?

Question 2.2 What does the implementation determine whether the KMeans iteration should stop?

Question 2.3 How is the convergence defined by this implementation? Another method to determine the convergence is to check whether the cluster membership for any of the records is changed. Implement this convergence checking by yourself, and paste the code and explaination here.

The basic idea of MapReduce KMeans algorithm is to parallelize the two steps with the Mapper and Reducer functions, respectively. The first step of KMeans applies to each data point, which fits Mapper processing. The second step is a typical aggregation process, where each Reducer can take care of the re-calculation of one or a few centroids. In the following, you will see the basic design of MapReduce KMeans algorithm. Your task is to implement this design.

We store the centroids in a file in the HDFS, where each line represents the cluster ID and its centroid in the following form.

0\t1.0,1.0
1\t10.0,10.0
where "\t" is the tab character. The data records are stored in the same format in the data file: the cluster ID +"\t" + the comma seperated values. The initial cluster ID for the data file can be any value. However, the initial centroids file should contain the correct cluster ID. Check the sample centroids file and the data file for the MapReduce KMeans algorithm. You may upload both files to your HDFS directory.

In the Mapper class, the centroids will be loaded from a HDFS file via the setup() function. The map function then process the text input (e.g., a line in the Data.hdfs file), and convert each line to a vector. By checking the distance between the vector and the centroids, you can determine the cluster membership. The output key-value pair will be (ClusterID, record_in_string_representation).

In the Reducer class, all records with the same ClusterID will be aggregated to the same Reducer. The reduce function will convert the string records to numerical records, and compute the updated centroids. The output key-value pair will be (ClusterID, centroid_in_string_representation). Note that we only keep the centroids. There is no cluster labels in the output.

The iteration will be implemented in the main function. You may take a look at parameter passing via Configuration and HDFS file operations before looking at the following example.

Configuration conf = new Configuration();
// setting up the parameters in conf for the job
...

FileOutputFormat.setOutputPath (job, new Path(output_path ));

FileSystem fs = FileSystem.get(conf);
// After setting up everything, we run the job in iterations
for (int i=0; i<niter; i++){
   Job job = new Job(conf, "KMeans");
   job.waitForCompletion(true);
   String cf = conf.get("centroids")
   double [][] centers0 = load_centers(cf); // a function to load the centroids from the file
   FileUtil.fullyDelete(fs, new Path(cf)); // remove the old centroids file 
   
   //Typically, you have multiple Reducer outputs. You should merge all the files output/part-r-* into one file and save it to the centroids file. Check FileUtil class for APIs that you can use. 
   FileUtil.fullyDelete(fs, new Path(output_path));
   double [][] centers1 = load_centers(cf) // 
   if (converge(centers0,centers1, threshold)
      break;
}
You should read the FileUtil class API for using the HDFS shell functions in the program. In the following, we will discuss the method for passing the parameters from the main function to the Map/Reduce functions, and the HDFS file operations.

You will need to pass parameters to Map/Reduce function by using the Configuration object. Here is an example,

// in your main function, when you setup the parameters
// you can put key-value pairs to the Configuration object
Configuration conf = new Configuration();
Job job = new Job(conf, "KMeans");
// variable name is "centroids"
// variable value is "/user/your_accout/centroids"
conf.set("centroids", "/user/your_accout/centroids");


// in the setup function of the Mapper class
// you can retrieve the variable with
Configuration conf = context.getConfiguration();
String filename = conf.get("centroids");
//filename will contain "/user/your_accout/centroids"
You can pass different types of variables as well. Check the Configuration class.

Reading HDFS file is kind of special. You will be using it in the load_centers function. Also, in the Mapper class, the centroids will be read from the centroids file in the setup() function of the Mapper class, and store in the variable

double [][] _centroids;

Here is a piece of sample code for accessing HDFS file in Mapper's setup() function.

Public void setup(Mapper.Context context){

    Configuration conf = context.getConfiguration();
    String filename = conf.get("centroids");

    Path p = new Path(filename);  
    FileSystem fs = FileSystem.get(conf);
    try{
        BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(pt)));
        String line;                       
        line=br.readLine();
        while (line != null){        
          //do something
        }
     }catch(Exception e){
         System.err.println(e);
     }
 }

I believe you have all the pieces for implementing the MapReduce program. Answer the following questions according to your implementation.

Question 2.4 How do you instantiate the Mapper template? i.e., what are the actual types that you choose to instantiate Mapper < KEYIN,VALUEIN,KEYOUT,VALUEOUT >?

Question 2.5 How do you instantiate the Reducer template? i.e., what are the actual types that you choose to instantiate Reducer < KEYIN,VALUEIN,KEYOUT,VALUEOUT >?

Question 2.6 If you determine the convergence by checking that no cluster label is changed. How would you design your program? Briefly describe your design.

Question 2.7 If there are N records in the dataset, what is the number of records passed to the Reduce phase in current design? Is there any method to reduce this traffic? Briefly describe your design. (Hint: use Combiner function.)

Question 2.8 How many hours approximately did you spend on this task?

Question 2.9 How useful is this task to your understanding of MapReduce programming? (not useful, somewhat useful, very useful)

Final Survey Questions

Question 3.1 Your level of interest in this lab exercise (high, average, low);

Question 3.2 How challenging is this lab exercise? (high, average, low);

Question 3.3 How valuable is this lab as a part of the course (high, average, low).

Question 3.4 Are the supporting materials and lectures helpful for you to finish the project? (very helpful, somewhat helpful, not helpful);

Question 3.5 How many hours approximately did you spend in completing all the lab exercise;

Quertion 3.6 Do you feel confident on applying the skills learned in the lab to solve other problems with MapReduce? (low, average, high)

Deliverables

Turn in two PDF files + one zipped source files to Pilot: (1) the report that answers all the questions; (2) copy and paste all source code to one PDF file; (3) and the zip file that contains all source files and a README file describing how to compile and run your program. The grader may need to check your source files. Also, turn in the hard copy of the report to the class on April 9.


This page, first created: 18 Feb 2014; last updated: 28 Mar. 2015