MapReduce is a framework for processing highly distributable
problems across huge datasets using a large number of computers
(nodes), collectively referred to as a cluster (if all nodes use
the same hardware) or a grid (if the nodes use different hardware).
Computational processing can occur on data stored either in
a filesystem(unstructured)
or in a database (structured).
"Map" step: The master node takes the
input, partitions it up into smaller sub-problems, and distributes
them to worker nodes. A worker node may do this again in turn,
leading to a multi-level tree structure.
The worker node processes the smaller problem, and passes the
answer back to its master node.
"Reduce" step: The master node then
collects the answers to all the sub-problems and combines them in
some way to form the output – the answer to the
problem it was originally trying to solve.
Python实现MapReduce(帮助理解mapreduce),来自网络分享
import
threading
import
Queue
import
operator
import
urllib
import
re
class
MapReduce:
''' MapReduce - to use, subclass by defining
these functions,
then call
self.map_reduce():
parse_fn(self, k, v) => [(k, v),
...]
map_fn(self, k, v) => [(k, v1), (k, v2),
...]
reduce_fn(self, k, [v1, v2, ...]) => [(k,
v)]
output_fn(self, [(k, v), ...])
'''
def __init__(self):
self.data
= None
self.num_worker_threads = 5
class SynchronizedDict(): # we need this for
merging
def
__init__(self):
self.lock =
threading.Lock()
self.d = {}
def
isin(self, k):
with
self.lock:
if k in self.d:
return
True
else:
return
False
def
get(self, k):
with
self.lock:
return self.d[k]
def
set(self, k, v): # we don't need del
with
self.lock:
self.d[k] = v
def
set_append(self, k, v): # for thread-safe list
append
with
self.lock:
self.d[k].append(v)
def
items(self):
with
self.lock:
return self.d.items()
def create_queue(self, input_list): # helper fn
for queues
output_queue = Queue.Queue()
for value
in input_list:
output_queue.put(value)
return
output_queue
def create_list(self, input_queue): # helper fn
for queues
output_list = []
while not
input_queue.empty():
item =
input_queue.get()
output_list.append(item)
input_queue.task_done()
return
output_list
def merge_fn(self, k, v, merge_dict): # helper
fn for merge
if
merge_dict.isin(k):
merge_dict.set_append(k,
v)
else:
merge_dict.set(k,
[v])
def process_queue(self, input_queue,
fn_selector): # helper fn
output_queue = Queue.Queue()
if
fn_selector == 'merge':
merge_dict =
self.SynchronizedDict()
def
worker():
while not
input_queue.empty():
(k, v) = input_queue.get()
if fn_selector in ['map',
'reduce']:
if
fn_selector == 'map':
result_list = self.map_fn(k,
v)
elif
fn_selector == 'reduce':
result_list =
self.reduce_fn(k, v)
for
result_tuple in result_list: # flatten
output_queue.put(result_tuple)
elif fn_selector == 'merge': # merge v to same
k
self.merge_fn(k, v, merge_dict)
else:
raise
Exception, "Bad fn_selector="+fn_selector
input_queue.task_done()
for i in
range(self.num_worker_threads): # start threads
worker_thread =
threading.Thread(target=worker)
worker_thread.daemon =
True
worker_thread.start()
input_queue.join() # wait for worker threads to
finish
if
fn_selector == 'merge':
output_list =
sorted(merge_dict.items(),
key=operator.itemgetter(0))
output_queue =
self.create_queue(output_list)
return
output_queue
def map_reduce(self): # the actual map-reduce
algoritm
data_list
= self.parse_fn(self.data)
data_queue
= self.create_queue(data_list) # enqueue the data so we can
multi-process
map_queue
= self.process_queue(data_queue, 'map') # [(k,v),...]
=> [(k,v1),(k,v2),...]
merge_queue = self.process_queue(map_queue, 'merge') #
[(k,v1),(k,v2),...] =>
[(k,[v1,v2,...]),...]
reduce_queue = self.process_queue(merge_queue, 'reduce') #
[(k,[v1,v2,...]),...] =>
[(k,v),...]
output_list = self.create_list(reduce_queue) # deque into list for
output handling
self.output_fn(output_list)
class
WordCount(MapReduce):
def __init__(self):
MapReduce.__init__(self)
self.min_count = 1
def parse_fn(self, data): # break string into
[(k, v), ...] tuples for each line
data_list
= map(lambda line: (None, line),
data.splitlines());
return
data_list
def map_fn(self, key, str): # return (word, 1)
tuples for each word, ignore key
word_list
= []
for word
in re.split(r'\W+', str.lower()):
bare_word =
re.sub(r"[^A-Za-z0-9]*", r"", word);
if len(bare_word)
> 0:
word_list.append((bare_word,
1))
return
word_list
def reduce_fn(self, word, count_list): # just
sum the counts
return
[(word, sum(count_list))]
def output_fn(self, output_list): # just print
the resulting list
"Word".ljust(15), "Count".rjust(5)
"______________".ljust(15), "_____".rjust(5)
sorted_list = sorted(output_list, key=operator.itemgetter(1),
reverse=True)
for (word,
count) in sorted_list:
if count >
self.min_count:
print word.ljust(15),
repr(count).rjust(5)
def test_with_monty(self):
self.data
= """The Meaning of Life is:
try and be nice to
people,
avoid eating
fat,
read a good book every now
and then,
get some walking
in,
and try and live together in
peace and harmony
with people of all creeds and
nations."""
self.map_reduce()
def test_with_nietzsche(self):
self.min_count = 700
f =
urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
self.data
= f.read()
f.close()
self.map_reduce()
class
DistributedGrep(MapReduce):
def __init__(self):
MapReduce.__init__(self)
self.matcher = None
def parse_fn(self, data): # one list item per
line with line number
data_list
= []
line_num =
1
for line
in data.splitlines():
data_list.append((line_num,
line))
line_num = line_num +
1
return
data_list
def map_fn(self, line_num, line): # return line
if matches, include line num
matcher =
self.matcher
matched_line = []
if
matcher.match(line):
matched_line = [(line_num,
line)]
return
matched_line
def reduce_fn(self, line_num, line_list): #
identity reducer
return
[(line_num, line_list[0])] # we only ever have one line in the
list
def output_fn(self, output_list): # just print
the resulting list
"LineNum".rjust(8), "Line".ljust(70)
"_______".rjust(8), "____"
for
(line_num, line) in sorted(output_list,
key=operator.itemgetter(0)):
repr(line_num).rjust(8), line.ljust(70)
def test_with_nietzsche(self):
self.matcher = re.compile(r".*Jahre.*")
f =
urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
self.data
= f.read()
f.close()
self.map_reduce()
def
main():
wc = WordCount()
wc.test_with_monty()
wc.test_with_nietzsche()
dg = DistributedGrep()
dg.test_with_nietzsche()
if __name__ ==
"__main__":
main()