Tuesday, September 2, 2014

Basic Hadoop Command Lines and Simple mapper and reducer programs

Upload a file to Hadoop DFS

To upload a file (e.g. purchases.txt) to the hadoop DFS, use the following command:

$hadoop fs -put purchases.txt myinput

This will upload the file "purchases.txt" to Hadoop DFS within the directory named "myinput"

List files in Hadoop DFS

To list files in Hadoop DFS, use the following command:

$hadoop fs -ls

This will list the files in Hadoop DFS

To list files in "myinput", use the following command:

$hadoop fs -ls myinput

Process file in Hadoop DFS using mapper and reducer programs

To process data in files uploaded to Hadoop DFS via mapper and reducer programs, use the following commad:

$hadoop jar [hadoop.jar location_path] -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input myinput -output joboutput

Where [hadoop.jar location_path] is the path to the hadoop jar file; mapper.py is the mapper python program, and reducer.py is the reducer python program, "myinput" is the input to mapper.py, and "joboutput" is the output folder in Hadoop DFS which stores outputs from reducer.py

After running the above command,  the user can list the content in "joboutput" using the following command:

$hadoop fs -ls joboutput

Note the actual output is stored in the file "part00000" with the "joboutput" folder. To view this file, run the following command

$hadoop fs -cat joboutput/part-00000

To download this file to local machine, run the following command

$hadoop fs -get joboutput/part-00000 output.txt

This will save the output to the file "output.txt"

To rerun the Hadoop processing on "myinput", please remember to delete the "joboutput" created previously using the following command before rerun:

$hadoop fs -rm -r -f joboutput

Alternatively, just specify output to be a different name such as "joboutput2"

If you are using Cloudera CDH, you can run the MapReduce using the command:

$hs mapper.py reducer.py myinput joboutput 

The above line replace the commands "hadoop jar [hadoop jar location] -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input myinput -output joboutput", and is provided by the CDH daemon version

Simple Mapper and Reducer program in python

In this section, I will create simple programs mapper.py and reducer.py which sums the values of the 4th column in the "myinput" (which is purchases.txt) groupped by the values in the 1st column in the "myinput".

below shows the content of a simple mapper.py

def mapper()
    for line in sys.stdin:
        # strip off extra whitespace, split on tab and put the data in an array
        data = line.strip().split("\t")
        if len(data)==4:
            keyItem, item2, item3, valueItem = data
            print "{0}\t{1}".format(keyItem, valueItem)

 The above mapper.py program assumes that input file (e.g, purchases.txt mentioned earlier) has 4 columns, and the first column is used as the key while the last column is used as the value. the mapper program simply output the "[keyItem]\t[valueItem]" to the reducer program.

When the "[keyItem]\t[valueItem]" data series are passed to the reducer program, a shuffle and sort is performed such that all these data are sorted by the "[keyItem]" value.

below shows the content of a simple reducer.py

currentKey = None
currentTotalValue = 0

def reducer()
    for line in sys.stdin
        data = line.split("\t")
        keyItem, valueItem = data
        if currentKey == None or currentKey != keyItem:
              if currentKey != None:
                   print "{0}\t{1}".format(currentKey, currentTotalValue)
              currentKey = keyItem
              currentTotalValue = 0
       currentTotalValue += float(valueItem)
   
   if currentKey != None:
       print "{0}\t{1}".format(currentKey, currentTotalValue)

The above reducer program sums the valueItem based on the keyItem. This can be done in the above implementation because the stdin input for the reducer are sorted based on "[keyItem]"

No comments:

Post a Comment