1、OutputFormat
OutputFormat:是MapReduce输出数据的基类,所有MapReduce的数据输出都实现了OutputFormat抽象类。下面我们介绍几种常见的OutputFormat子类
- TextOutputFormat:默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方 法把它们转换为字符串。
- SequenceFileOutputFormat:将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
2、自定义OutputFormat
需求分析:
要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类输出需求可以通过自定义OutputFormat来实现。
实现步骤:
- 自定义一个类继承FileOutputFormat。
- 改写RecordWriter,改写输出数据的方法write()。
需求:
网络请求日志数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.lagou.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
输出结果
lagou.log
other.log
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
参考代码
Mapper
package com.lagou.mr.output;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
Reducer
package com.lagou.mr.output;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
OutputFormat
package com.lagou.mr.output;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> {
// 写出数据的对象
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
// 定义写出数据的路径信息,并获取到输出流传入到writer对象中
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
// 定义输出的路径
FSDataOutputStream lagouOut = fs.create(new Path("D:\\out_file\\out_2\\lagou.log"));
FSDataOutputStream otherOut = fs.create(new Path("D:\\out_file\\out_2\\other.log"));
CustomWriter customOutputFormat = new CustomWriter(lagouOut,otherOut);
return customOutputFormat;
}
}
RecordWriter
package com.lagou.mr.output;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class CustomWriter extends RecordWriter<Text, NullWritable> {
// 定义成员变量
private FSDataOutputStream lagouOut;
private FSDataOutputStream otherOut;
// 定义构造方法接受两个输出流
public CustomWriter(FSDataOutputStream lagouOut, FSDataOutputStream otherOut) {
this.lagouOut = lagouOut;
this.otherOut = otherOut;
}
// 写出数据的逻辑,控制写出的路径
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 写出数据需要输出流
String line = key.toString();
if (line.contains("lagou")) {
lagouOut.write(line.getBytes());
lagouOut.write("\r\n".getBytes());
}else {
otherOut.write(line.getBytes());
otherOut.write("\r\n".getBytes());
}
}
// 关闭释放资源
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(lagouOut);
IOUtils.closeStream(otherOut);
}
}
Driver
package com.lagou.mr.output;
import com.lagou.mr.wc.WordCountDriver;
import com.lagou.mr.wc.WordCountMapper;
import com.lagou.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OutputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "OutputDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(OutputDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(OutputMapper.class);
job.setReducerClass(OutputReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定使用自定义outputformat
job.setOutputFormatClass(CustomOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:\\data\\click_log")); //指定读取数据的原始路径
// 7. 指定job输出结果路径(不指定会报错),因为mr默认要输出一个success等标识文件
FileOutputFormat.setOutputPath(job, new Path("D:\\out_file\\out_2\\out")); //指定成功标识数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
验证结果是否已把数据分别输出到不同的目录中!!
版权声明:本文为weixin_52851967原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。