python调用hadoop

一、MRJob

Mrjob是一个编写MapReduce任务的开源Python框架,它实际上对Hadoop Streaming的命令行进行了封装,因此接粗不到Hadoop的数据流命令行,使我们可以更轻松、快速的编写MapReduce任务。
Mrjob通过Python的yield机制将函数变成一个生成器,通过不断调用next()去实现key:value的初始化或运算操作。

#!/usr/bin/python 
## 注意不要用/usr/bin/env python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
import re
class MRwordCount(MRJob):
    def mapper(self, _, line):
        pattern=re.compile(r'(\W+)')
        for word in re.split(pattern=pattern,string=line):
            if word.isalpha():
                yield (word.lower(),1)
    def reducer(self, word, count):
        l=list(count)
        yield (word,sum(l))
if __name__ == '__main__':
    MRwordCount.run() #run()方法,开始执行MapReduce任务。

执行脚本

python /wordCount.py /test.txt -r hadoop hdfs:///input hdfs:///output

二、python代码调用MapReduce

使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。
我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。

map代码:

#!/usr/bin/python
## 注意不要用/usr/bin/env python
import sys
for line in sys.stdin:  # 遍历读入数据的每一行
    line = line.strip()  # 将行尾行首的空格去除
    words = line.split()  #按空格将句子分割成单个单词
    for word in words:
        print('%s\t%s' %(word, 1))

reduce.py代码:

#!/usr/bin/python
## 注意不要用/usr/bin/env python
from operator import itemgetter
import sys
current_word = None  # 为当前单词
current_count = 0  # 当前单词频数
word = None
for line in sys.stdin:
    words = line.strip()  # 去除字符串首尾的空白字符
    word, count = words.split('\t')  # 按照制表符分隔单词和数量
    try:
        count = int(count)  # 将字符串类型的‘1’转换为整型1
    except ValueError:
        continue
    if current_word == word:  # 如果当前的单词等于读入的单词
        current_count += count  # 单词频数加1
    else:
        if current_word:  # 如果当前的单词不为空则打印其单词和频数
            print('%s\t%s' %(current_word, current_count))
        current_count = count  # 否则将读入的单词赋值给当前单词,且更新频数
        current_word = word
if current_word == word:
    print('%s\t%s' %(current_word, current_count))

执行脚本:

hadoop jar /home/hadoop/hadoop-2.7.7/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar -files ./mapper.py,./reducer.py -mapper ./mapper.py -reducer ./reducer.py -input /input/* -output /output
``

版权声明:本文为weixin_41089778原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。