November 18, 2011
MapReduce in Python code using the techniques is that we use HadoopStreaming to help us to pass between the Map and Reduce data through STDIN (standard input) and STDOUT (standard output). We just use Python sys.stdin to input data, output data using sys.stdout, this is good because HadoopStreaming other things will help us. This is true, do not believe it!
Map: mapper.py
the following code stored in / home / hadoop / mapper.py, he read data from STDIN and separated from the word trip , generate a list of mapping the relationship between word and occurrence:
Note: To ensure that the script has sufficient permissions (chmod x / home / hadoop / mapper.py).
#! / usr / bin / env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip ()
# split the line into words
words = line.split ()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, ie the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print s \ t% s (word, 1) < br />
In this script, does not appear to calculate the total number of words, it will output “1″ quickly, although it may appear multiple times in the input, the calculation is left to Reduce the later steps (or called the program) to achieve. Of course, you can change the following coding style, fully respect your habits.
Reduce: reducer.py
code is stored in / home / hadoop / reducer.py, this script does is read from the STDIN mapper.py results , and then calculate the total number of occurrences of each word and outputs the result to STDOUT.
Also, pay attention to the script permissions: chmod x / home / hadoop / reducer.py
#! / usr / bin / env python

from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip ()
# parse the input we got from mapper.py
word, count = line.split ( t 1)
# convert count (currently a string) to int
try:
count = int (count)
word2count [word] = word2count.get (word, 0) count
except ValueError:
# count was not a number, so silently
# ignore / discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted (word2count.items (), key = itemgetter (0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print s \ t% s (word, count)
test your code (cat datamapsortreduce)
I suggest you run the MapReduce job in hand before the test attempts to test your mapper . py and reducer.py script, so as not to get any return results
Here are some suggestions on how to test your Map and Reduce functions:
— ——————————————-

# very basic test
hadoop @ ubuntu: ~ $ echo “foo foo quux labs foo bar quux” / home / hadoop / mapper.py
foo 1

foo 1
quux 1
labs 1
foo 1
bar 1

——————————————— –
hadoop @ ubuntu: ~ $ echo “foo foo quux labs foo bar quux” / home / hadoop / mapper.pysort / home / hadoop / reducer.py
bar 1
foo 3
labs 1
——————— ————————-
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop @ ubuntu: ~ $ cat / tmp/gutenberg/20417-8.txt/home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook1
of1
[...]
(you get the idea)
quux 2
quux 1
— ——————————————-

In the Python script running on the Hadoop platform
For this example, we will need three books:
The Outline of Science, Vol. 1 (of 4) by J . Arthur Thomson
The Notebooks of Leonardo Da Vinci
Ulysses by James Joyce
download them and use the us-ascii encoding memory after decompression files, stored in the temporary directory, such as / tmp / gutenberg.
hadoop @ ubuntu: ~ $ ls-l / tmp / gutenberg /
total 3592
-rw-r – r – 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r – r – 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r – r – 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop @ ubuntu: ~ $
copy local data to the HDFS
before we run the MapReduce job, we need to copy a local file to HDFS in:

hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop dfs-copyFromLocal / tmp / gutenberg gutenberg
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop dfs -ls
Found 1 items
/ user / hadoop / gutenberg
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop dfs -ls gutenberg
Found 3 items
/ user/hadoop/gutenberg/20417-8.txt674425
/ user/hadoop/gutenberg/7ldvc10 . txt1423808
/ user/hadoop/gutenberg/ulyss12.txt1561677
implement MapReduce job
Now, everything is ready, we will run Python MapReduce job in Hadoop cluster. Like I said above, we use the
HadoopStreaming help us to pass data between the Map and Reduce through STDIN and STDOUT, the standard input and output.
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper / home / hadoop / mapper.py-reducer / home / hadoop / reducer.py-input gutenberg / *
-output gutenberg-output
running, if you want to change the Hadoop Some settings, such as increasing the number of Reduce tasks, you can use the “-jobconf” option:
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop jar contrib/streaming/hadoop-0.19 .1-streaming. jar
-jobconf mapred.reduce.tasks = 16-mapper …
an important reminder about Hadoop does not honor mapred.map . tasks
This task will read the HDFS directory gutenberg and handle them, the result is stored in a separate results file and stored in HDFS directory
gutenberg-output directory.
performed before the results are as follows:
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper / home / hadoop / mapper.py-reducer / home / hadoop / reducer.py-input gutenberg / *
-output gutenberg-output
additionalConfSpec_: null
null = @ @ @ userJobConfProps_.get (stream.shipped.hadoo pstreaming
packageJobJar: [/ usr / local / hadoop-datastore / hadoop-hadoop/hadoop- unjar54543 /]
[] / tmp/streamjob54544.jar tmpDir = null
[...] INFO mapred.FileInputFormat: Total input paths to process: 7
[...] INFO streaming.StreamJob: getLocalDirs (): [/ usr / local / hadoop-datastore / hadoop-hadoop / mapred / local]

[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% reduce 0%
[...] INFO streaming.StreamJob: map 43% reduce 0%
[...] INFO streaming.StreamJob: map 86% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 33%
[...] INFO streaming.StreamJob: map 100% reduce 70%
[...] INFO streaming.StreamJob: map 100% reduce 77%
[...] INFO streaming.StreamJob: map 100% reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop @ ubuntu: / usr / local / hadoop $
As you can see the above output, Hadoop also provides WEB interface to display a basic statistics and information.
When the Hadoop cluster running, you can use the browser to access http://localhost:50030/, as shown in Figure:
test results are output and stored in HDFS directory gutenberg-output in:
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop dfs-ls gutenberg-output
Found 1 items
/ user/hadoop/gutenberg-output/part-00000 903193 2007-09-21 13:00
hadoop @ ubuntu: / usr / local / hadoop $
can use dfs-cat command to check file directory
hadoop @ ubuntu: / usr / local / hadoop $ bin / hadoop dfs-cat gutenberg-output / part-00000
“(Lo) cra” 1
“14901
” 1498, “1
“35″ 1
“40,” 1
“A2
” AS-IS “. 2
“A_ 1
” Absoluti 1
[...]
hadoop @ ubuntu: / usr / local / hadoop $
attention than the output, the above results (“) symbols are not Hadoop inserted.