需求:有如下文件,需要将itstar输出到一个文件,其他的输出到另一个文件,并自定义输出文件路径与文件名

1:定义FilterMap类
package OutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FilterMap extends Mapper<LongWritable, Text, Text, NullWritable> {
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
k.set(line);
//写出
context.write(k, NullWritable.get());
}
}
2:定义FilterReducer类
package OutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
String line = key.toString();
line = line + "\r\n";
context.write(new Text(line), NullWritable.get());
}
}
3:定义了FilterlogRecordWriter类
package OutputFormat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class FilterlogRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream itstarOut = null;
FSDataOutputStream otherOut = null;
/**
* 数据的输出环节
* @param job 用于获取配置信息,从而得到从而得到fs
*/
public FilterlogRecordWriter(TaskAttemptContext job) {
//数据的输出
try {
FileSystem fs = FileSystem.get(job.getConfiguration());
Path itstarPath = new Path("E:\\bigdata_code\\outputformat\\out\\itstar.log");
Path otherPath = new Path("E:\\bigdata_code\\outputformat\\out\\other.log");
itstarOut = fs.create(itstarPath);
otherOut = fs.create(otherPath);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
if(text.toString().contains("itstar")){
itstarOut.write(text.toString().getBytes());
}else {
otherOut.write(text.toString().getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
if(itstarOut != null){
itstarOut.close();
}
if(otherOut != null){
otherOut.close();
}
}
}
4:定义了FilterlogOutputFormat类
package OutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FilterlogOutputFormat extends FileOutputFormat<Text, NullWritable> {
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
return new FilterlogRecordWriter(job);
}
}
5:定义了FilterDriver类
package OutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FilterDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[] {"E:\\bigdata_code\\outputformat\\outputformat.txt",
"E:\\bigdata_code\\outputformat\\out"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FilterDriver.class);
job.setMapperClass(FilterMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FilterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//要将自定义的输出格式组件设置到job
job.setOutputFormatClass(FilterlogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 虽然自定义了outputformat,但是因为OutputFormat继承自fileoutputformat,
// 而fileoutputformat要输出一个SUCCESS文件,所以这里还要指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
输出结果:

版权声明:本文为zhanglei_16原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。