1.描述一下手写MR的大概流程和规范
Mapper阶段:
我们要知道每个map任务,都会继承Mapper类, 我们先自定义WordCountMapper, 让其继承Mapper类。其中Mapper有四个参数类型<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,对应的为WordCountMapper<LongWritable, Text, Text, IntWritable>。接着我们需要重写map方法(如果实验文件有n行,故文件所在的Mapper类都单独调用了n次map方法)。然后我们按默认设置将数据一行一行读取出来,获取每一行数据并且按照分隔符(空格)将其切分。随即我们将数据以<key,value>的形式循环遍历输出。最后经过context.write方法按Mapper类中定义的输出格式<Text,IntWritable>写入上下文中。
Reducer阶段
首先我们自定义WordCountReducer并让其继承Reducer类(其中Reducer有四个参数类型<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,对应的为WordCountReducer<Text, IntWritable, Text, IntWritable>(由run()方法启动Reducer的任务))。然后我们需要重写reduce()方法, 设置一个初始化变量, 接着遍历当前相同key的一组values进行汇总,并累加求和。最后经过context.write方法按Mapper类中定义的输出格式<Text,IntWritable>写入上下文中。
ps: reduce方法输出后是没有排序的
Job阶段
(1)驱动类,当前MR程序入口,核心操作是提交job
(2)获取配置信息以及获取Job对象(实例化任务)
(3)关联本程序的jar,设定运行jar类型
(4)关联Mapper的输入和输出类型
(5)关联Reducer的输入输出类型
(6)关联Mapper和Reducer的jar
(7)设置输入输出路径
(8)提交job
2. 如何实现Hadoop中的序列化, 以及Hadoop的序列化和Java的序列化有什么区别?
通过自定义bean对象实现序列化接口(Writable)
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
ps: 注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用分隔符(“\t”)分开,方便后续用
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以Hadoop自己开发了一套序列化机(Writable), 具有高效, 紧凑, 可扩展, 互操作等特点。
3. 概述一下MR的执行流程
MR的执行流程整体大致分为2部分:
(1)提交job: 对当前MR进行初始化工作, 对MT和RT进行规划
(2)执行job: 执行就是每一个MT和RT
MR的大致流程是这样的, 首先要进行Input阶段,这个阶段通过InputFormat这个组件进行切片, 按行读取数据, 并且规划MT的数量。然后就进入Mapper阶段, 在Mapper阶段, 将按行读取进来的数据, 通过执行我们自定义的业务逻辑处理, 将我们处理好的每一组<key,value>写入环形缓冲区内, 接着就经历一个较为复杂的shuffle阶段,将Map输出进行分区,分组,排序等处理后, 作为输入传给Reduce端。接下来,在Reduce端再进行一次归并排序, 再进行分组, 每一组<key,value>进入reduce开始执行, 执行完之后, 最后Reduce端通过OutputFormat组件进行输出?
4. InputFormat负责数据写的时候要进行切片,为什么切片大小默认是128M?
切片的大小默认情况下和数据存储的块大小相同(为了读取数据时减少跨机器读取, 从而提升MR性能)
5. 描述一下切片的逻辑(从源码角度描述)
提到这个切片逻辑, 在MR中是通过InputFormat这个常用组件来帮我们完成切片工作的, 其核心逻辑首先我们要根据输入路径获取我们所要处理的文件, 检验我们的目标文件是否要进行忽略, 比如文件有多级目录是否要忽略子目录, 一般情况下是不忽略的. 然后我们获取切片的文件详情看当前文件是否可以进行切分, 因为很多文件就比如说压缩文件很多就不支持切分. 接着我们获取当前文件的块大小, 计算切片大小, 判断剩余文件是否要继续切片, 如何判断呢?我们将当前文件剩余大小和切片大小做个除法, 其结果要是大于1.1, 继续切分。
6. CombineTextInputFormat机制是怎么实现的?
首先它是用于处理小文件的场景, 我们要想实现这个机制首先需要设置切片大小, 然后虚拟出一个存储文件来, 根据和切片大小进行比较, 如果说当前文件大于我们设置的大小而且还小于设置大小的2倍, 那么就将文件一分为二。根据虚拟后的结果我们再将每一个虚拟文件和设置的大小进行比较, 如果大于等于设置的大小就单独形成一个切片, 否则就和下一个虚拟文件进行合并。
7. 阐述一下 Shuffle机制流程?
首先是由map方法处理后的key/value对输入到环形缓冲区。当环形缓冲区写满之后将会对缓冲区里面的数据进行分区、排序操作,然后溢写到磁盘中(也可以先使用Combiner进行合并处理),可能环形缓冲区会进行多次溢写。然后将多次溢写的数据按分区进行归并排序,合并为一个大的文件,然后将这个大文件通过压缩手段进行压缩(减小磁盘耗费量,减少网络IO传输),最后写入到磁盘中。在Reduce端,每一个Reduce按照分区号将每一个map输出的数据中的对应分区的数据拷贝到自己的缓冲区中(比如:reduceTask1是处理1号分区的数据,则它就将所有map输出的1号缓冲区的数据拷贝到自己的缓冲区中),若缓冲区不够则将数据溢写到磁盘。然后对每一个map来的数据进行归并排序。最后按照相同的key分组输入到reduce方法中。
# 8. 在MR程序中由谁来决定分区的数量,哪个阶段环节会开始往分区中写数据?
在MR程序中,由ReduceTask来决定分区的数量, 在默认情况下,有一个分区, 为0号分区, 数据从环形缓冲区溢写出后, 进行分区?
9. 阐述MR中实现分区的思路(从源码角度分析)
首先要获取我们当前MR中的ReduceTask的数量, 然后看它是否大于1, 如果大于1 就走它默认的分区规则, 用当前key的hashcode值和ReduceTask的数量取余得出分区编号. 如果当前只有一个ReduceTask,只有一个分区 ,Hadoop 就会创建一个分区器对象Partitioner, 然后获取到当前的分区编号
10. 描述一下Hadoop中实现排序比较的规则
① 直接让参与比较的对象上实现WritableComparable 接口,并在该类中实现compareTo,在compareTo中定义自己的比较规则。这种情况 当运行的的时候Hadoop会帮助我们生成 比较器对象WritableComparator。
② 自定一个比较器对象需要继承Hadoop提供的WritableComparator类,重写该类compare() 方法,在该方法中定义比较规则,注意在自定义的比较器对象中通过调用父类的super方法将自定义的比较器对象和要参与比较的对象进行关联。最后再Driver类中指定自定义的比较器对象。
11. Hadoop中实现排序的两种方案分别是什么?
(1)直接让参与比较的对象实现 WritableComparable 接口,并指定泛型,接下来实现compareTo方法 实现比较规则
(2)自定义一个比较器对象,需要继承WritableComparator类 ,重写它的compare,注意在构造器中调用父类对当前的要参与比较的对象进行实例化。当前要参与比较的对象必须要实现WritableComparable 接口。
12. 编写MR的时候什么情况下使用Combiner 具体实现流程是什么?
Combiner的使用场景:总的来说,为了提升MR程序的运行效率,为了减轻ReduceTask的压力,另外减少IO的开销。当Reduce端处理的数据考虑到多个MapTask的数据的整体集时就不能提前合并了。我们要想实现自定义Combiner, 首先要自定义一个Combiner, 让它去继承我们的Reducer, 然后我们需要重写一下Reduce方法, 进行汇总, 然后写出数据。最后我们不能忘了要在Job驱动类中添加设置。
13. OutputFormat自定义实现流程描述一下
① 自定一个 OutputFormat 类,继承Hadoop提供的OutputFormat,在该类中实现getRecordWriter() ,返回一个RecordWriter
② 自定义一个 RecordWriter 并且继承Hadoop提供的RecordWriter类,在该类中重写 write() 和 close() 在这些方法中完成自定义输出。
14. MR实现 ReduceJoin 的思路,以及ReduceJoin方案有哪些不足?
思路:
首先我们需要分析文件之间的关系,然后定位关联字段; 其次在Map阶段我们需要对多个文件进行数据整合,并且让关联字段作为输出数据的key。当一组相同key的values进入Reduce阶段的reduce方法后, 会先把两个文件数据分离出来,分别放到各自的对象中维护。然后再把当前一组维护好的数据进行关联操作,得到想要的数据结果。
不足:
这种方式中, 合并的操作是在Reduce阶段完成, Reduce端的处理压力太大, Map节点的运算负载则很低, 资源利用率不高吧, 而且在Reduce阶段很容易发生数据倾斜问题。
15. MR实现 MapJoin 的思路,以及MapJoin的局限性是什么?
思路:
首先我们要分析的是分析文件之间的关系,然后定位关联字段, 然后将小文件的数据映射到内存中的一个容器维护起来。当MapTask处理大文件的数据时,每读取一行数据,就根据当前行中的关联字段到内存的容器里获取对象的信息。最后再封装结果将其输出。
局限性:
MapJoin适用于一张表十分小、一张表很大的场景。