Upload a file to Hadoop DFS
To upload a file (e.g. purchases.txt) to the hadoop DFS, use the following command:$hadoop fs -put purchases.txt myinput
This will upload the file "purchases.txt" to Hadoop DFS within the directory named "myinput"
List files in Hadoop DFS
To list files in Hadoop DFS, use the following command:$hadoop fs -ls
This will list the files in Hadoop DFS
To list files in "myinput", use the following command:
$hadoop fs -ls myinput
Process file in Hadoop DFS using mapper and reducer programs
To process data in files uploaded to Hadoop DFS via mapper and reducer programs, use the following commad:$hadoop jar [hadoop.jar location_path] -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input myinput -output joboutput
Where [hadoop.jar location_path] is the path to the hadoop jar file; mapper.py is the mapper python program, and reducer.py is the reducer python program, "myinput" is the input to mapper.py, and "joboutput" is the output folder in Hadoop DFS which stores outputs from reducer.py
After running the above command, the user can list the content in "joboutput" using the following command:
$hadoop fs -ls joboutput
Note the actual output is stored in the file "part00000" with the "joboutput" folder. To view this file, run the following command
$hadoop fs -cat joboutput/part-00000
To download this file to local machine, run the following command
$hadoop fs -get joboutput/part-00000 output.txt
This will save the output to the file "output.txt"
To rerun the Hadoop processing on "myinput", please remember to delete the "joboutput" created previously using the following command before rerun:
$hadoop fs -rm -r -f joboutput
Alternatively, just specify output to be a different name such as "joboutput2"
If you are using Cloudera CDH, you can run the MapReduce using the command:
$hs mapper.py reducer.py myinput joboutput
The above line replace the commands "hadoop jar [hadoop jar location] -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input myinput -output joboutput", and is provided by the CDH daemon version
Simple Mapper and Reducer program in python
In this section, I will create simple programs mapper.py and reducer.py which sums the values of the 4th column in the "myinput" (which is purchases.txt) groupped by the values in the 1st column in the "myinput".below shows the content of a simple mapper.py
def mapper()
for line in sys.stdin:
# strip off extra whitespace, split on tab and put the data in an array
data = line.strip().split("\t")
if len(data)==4:
keyItem, item2, item3, valueItem = data
print "{0}\t{1}".format(keyItem, valueItem)
The above mapper.py program assumes that input file (e.g, purchases.txt mentioned earlier) has 4 columns, and the first column is used as the key while the last column is used as the value. the mapper program simply output the "[keyItem]\t[valueItem]" to the reducer program.
When the "[keyItem]\t[valueItem]" data series are passed to the reducer program, a shuffle and sort is performed such that all these data are sorted by the "[keyItem]" value.
below shows the content of a simple reducer.py
currentKey = None
currentTotalValue = 0
def reducer()
for line in sys.stdin
data = line.split("\t")
keyItem, valueItem = data
if currentKey == None or currentKey != keyItem:
if currentKey != None:
print "{0}\t{1}".format(currentKey, currentTotalValue)
currentKey = keyItem
currentTotalValue = 0
currentTotalValue += float(valueItem)
if currentKey != None:
print "{0}\t{1}".format(currentKey, currentTotalValue)
The above reducer program sums the valueItem based on the keyItem. This can be done in the above implementation because the stdin input for the reducer are sorted based on "[keyItem]"
No comments:
Post a Comment