Cloud Computing Labs: MapReduce Programming

The primary purpose of this assignment is to familiarize you with Hadoop and MapReduce programming. You will be asked to work through a few tutorials. The first part of the assignment does not involve actual coding, but requires a lot of activities on the command line (running Hadoop jobs, copying files around, etc.). In the second part, you'll develop a MapReduce version of kmeans clustering algorithm.

Part 1: Getting Familiar with Hadoop

For this project, you will use your own single-node Hadoop system in your linux box (or virtual machine). You have been asked to install the single-node Hadoop system in the previous exercise. If you have problems, please let us know.

After you set up the Hadoop system, work through the HDFS shell commands and the MapReduce tutorial with the WordCount source code that can be found in the directory "share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-*-sources.jar" of your hadoop installation directory.

Now, answer the following questions:

Question 1.1 Please follow the *commandline commands* in the lecture slides to compile and run the WordCount program. You can use the HDFS command to upload a text file for your test run. Post the screenshot of your run.

The output files can be conveniently checked with a command like this:

hdfs dfs -cat your_output_directory/part-r-00000 | less

Question 1.2 How many hours approximately did you spend on this task (including the time you spent previously installing the Hadoop system)?

Question 1.3 How useful is this task to your understanding of the Hadoop/MapReduce environment?


Part 2: MapReduce KMeans Algorithm

In this task, you will need to implement a MapReduce version of the KMeans Algorithm. We have discussed the KMeans algorithm in the class. Briefly, the KMeans algorithm has two steps in each iteration: (1) determine the cluster label for each data point with the current centroids and (2) re-calculate the centroids based on the updated cluster labels. You can find a detailed description of KMeans. In this project, first, you should review the standalone Java implementation of KMeans, and test it with the sample data.csv. There is the CSVHelper.java file you can reuse in your MapReduce implementation. Then, you will take a step by step approach to move the program to the MapReduce framework and learn the following key components: the template parameter setting, the Map function, the Reduce function, the global parameter setting, passing parameters to Map/Reduce functions, and iteratively running MapReduce jobs.

You can compile and run the KMeans program with the following commandline.

javac *.java
java KMeans

After you review the KMeans Java implementation, please answer the following questions:

Question 2.1 In what condition the KMeans iteration stops? (i.e., the convergence is reached)

Question 2.2 What is the potential problem with the current convergence definition, compared to the other method to determine the convergence by checking whether none of the records' cluster labels is changed anymore?

The basic idea of MapReduce KMeans algorithm is to cast the two steps to the Mapper and Reducer functions, respectively. The first step of KMeans applies to each data point, which fits Mapper processing. The second step is a typical aggregation process, where each Reducer can take care of the re-calculation of one or a few centroids. In the following, you will see the basic design of MapReduce KMeans algorithm. Your task is to implement this design.

We store the centroids in a file in the HDFS, where each line represents the cluster ID and its centroid in the following form.

0\t1.0,1.0
1\t10.0,10.0
where "\t" is the tab character. The data records are stored in the data file as: the record ID +"\t" + the comma seperated values. Check the sample centroids file and the data file for the MapReduce KMeans algorithm. You may upload both files to your HDFS directory.

In the Mapper class, the centroids will be loaded from a HDFS file via the setup() function. The map function then processes the text input (e.g., a line in the Data.hdfs file), and convert each line to a vector. By checking the distance between the vector and the centroids, you can determine the cluster membership. The output key-value pair will be (ClusterID, record_in_string_representation).

In the Reducer class, all records with the same ClusterID will be aggregated to the same Reducer. The reduce function will convert the string records to numerical records, and compute the updated centroids. The output key-value pair will be (ClusterID, centroid_in_string_representation). Note that in this implementation we only keep the centroids. There is no cluster labels in the output.

The iteration will be implemented in the main function. You may take a look at parameter passing via Configuration and HDFS file operations before looking at the following example.

Configuration conf = new Configuration();
//We run the job in iterations
for (int i=0; i<niter; i++){
   Job job = new Job(conf, "KMeans");
  // setting up the parameters for conf and the job
  ...
  
  FileSystem fs = FileSystem.get(conf);

   job.waitForCompletion(true);
   String cf = conf.get("centroids")
   double [][] centers0 = load_centers(cf); // a function to load the centroids from the file
   fs.delete( new Path(cf), true); // remove the old centroids file 
   
   //Depending on the number of reducers you set, you may have multiple Reducer output parts in the output_path. 
   //You should merge all the files output/part-r-* into one file and save it to the centroids file. 
   //Check FileUtil class for APIs that you can use. 

   fs.delete(new Path(output_path), true);
   double [][] centers1 = load_centers(cf); // 
   if (converge(centers0,centers1, threshold))
      break;
}
You should read the FileUtil class API for using the HDFS shell functions in the program. In the following, we will discuss the method for passing the parameters from the main function to the Map/Reduce functions, and the HDFS file operations.

You will need to pass parameters to Map/Reduce function by using the Configuration object. Here is an example,

// in your main function, when you setup the parameters
// you can put key-value pairs to the Configuration object
Configuration conf = new Configuration();
Job job = new Job(conf, "KMeans");
conf.set("variable name", "variable value");


// in the setup function of the Mapper class
// you can retrieve the variable with
Configuration conf = context.getConfiguration();
String filename = conf.get("variable name");
//filename will contain "/user/your_accout/centroids"
You can pass different types of variables as described in Configuration class.

Reading HDFS file is kind of special. You will be using it in the load_centers function. Also, in the Mapper class, the centroids will be read from the centroids file in the setup() function of the Mapper class, and store in the variable

double [][] _centroids;

Here is the sketch for accessing HDFS file in Mapper's setup() function.

Public void setup(Mapper.Context context){

    Configuration conf = context.getConfiguration();
    // get the HDFS filename from the conf object 

    Path path = new Path(filename);  
    FileSystem fs = FileSystem.get(conf);
    try{
        BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
        //do something with the file content
     }catch(Exception e){
     ...}
 }

I believe you have all the pieces for implementing the MapReduce program. Answer the following questions according to your implementation.

Question 2.3 Have you successfully compiled your code and generated the expected output? (yes or no)

Question 2.4 How do you instantiate the Mapper template? i.e., what are the actual types that you choose to instantiate Mapper < KEYIN,VALUEIN,KEYOUT,VALUEOUT >?. Similarly, how do you instantiate the Reducer template? i.e., what are the actual types that you choose to instantiate Reducer < KEYIN,VALUEIN,KEYOUT,VALUEOUT >?

Question 2.5 How do you implement parameter passing and the setup() function in your Mapper class. Paste the code snippets here.

Question 2.6 If there are N records in the dataset, how many records are passed to the Reduce phase in the current design (i.e., without the Combiner)?

Question 2.7 It appears that a Combiner class can be used in K-Means to reduce the Shuffle traffic. Please describe the design of the Combiner class (no need to implement it). Explain the amount of data is passed to the Reduce phase with the Combiner, in the big-O representation of the parameters: N records, k clusters, M mappers, and R reducers.

Question 2.8 Note that in the previous MapReduce program you only get the final cluster centroids, not the cluster label for each record. Now you want to use the Hadoop streaming tool with Python-coded Mapper/Reducer programs to assign cluster labels to the records. It turns out you only need a Mapper program to assign labels. Assume that the centroids are stored in a local file, encoded in the previously mentioned format. Your Mapper program will take the centroid file as a parameter and output tuples (record, cluster label).

Please implement the Mapper.py program. Paste your code and the commandline command using Hadoop streaming here.

Question 2.9 How many hours approximately did you spend on this task?

Question 2.10 How useful is this task to your understanding of MapReduce programming? (not useful, somewhat useful, very useful)

Final Survey Questions

Question 3.1 Your level of interest in this lab exercise (high, average, low);

Question 3.2 How challenging is this lab exercise? (high, average, low);

Question 3.3 How valuable is this lab as a part of the course (high, average, low).

Question 3.4 Are the supporting materials and lectures helpful for you to finish the project? (very helpful, somewhat helpful, not helpful);

Question 3.5 How many hours approximately did you spend in completing all the lab exercise;

Quertion 3.6 Do you feel confident on applying the skills learned in the lab to solve other problems with MapReduce? (low, average, high)

Deliverables


This page, last updated: Sept. 16, 2018