整体的规划
- Mapper类继承Mapper 实现map方法
- Redcuer类继承Reducer类实现reduce方法
- Driver驱动类提交job
Map类
package com.dcit.mr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper类继承Mapper方法
* <LongWritable,Text,Text, IntWritable>
* 输入的key-value类型&输出的key-value 类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
// 重写Mapper父类的map方法,实现对value的操作
//Map端核心业务的处理方法
//每输入一行数据会调用一次
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String string = value.toString();
String[] word = string.split(" ");
if (StringUtils.isEmpty(string)){
System.out.println("为空");
}
// 遍历word数组 封装数据key-value
for (String s : word) {
// 将遍历出来的单词s写出去 传递给redcue处理
System.out.println("test s:"+ s );
context.write(new Text(s),new IntWritable(1));
}
}
}
Reduce类
package com.dcit.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* 继承Redcuer类,重写reduce方法,实现聚合操作
* <Text,Text,Text,Text>
* 输入的key-value & 输出的key-value
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
//和Map方法一样
//每输入一行数据会调用一次
private Text outk = new Text();
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个变量,实现累加操作
int total = 0;
for (IntWritable value : values) {
outk.set(key);
total += value.get();
}
outv.set(total);
context.write(outk, outv);
}
}
Driver类
package com.dcit.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* MR程序的驱动类:主要用于提交MR任务
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1- 创建job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2- 设置Driver驱动类
job.setJarByClass(WordCountDriver.class);
// 3- 设置读取文件的输入目录
FileInputFormat.setInputPaths(job,new Path("data\\word.txt"));
// 4- 设置Mapper的主类
job.setMapperClass(WordCountMapper.class);
// 5- 设置Mapper的输出key-value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 6- 设置Redcuer的主类
job.setReducerClass(WordCountReducer.class);
// 7- 设置Redcuer的输出key-value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 8- 设置文件的输出路径
FileOutputFormat.setOutputPath(job,new Path("output\\wcoutput2"));
// 9- 提交job
boolean flag = job.waitForCompletion(true);
System.out.println(flag ? 0:1);
}
}
附录:pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.dcit.mr</groupId>
<artifactId>mr</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>maven</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
</project>
输入文件word.txt

输出结果


版权声明:本文为iiiitttttt原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。