hadoop中关于reduce参数values遍历导致key值变化和遍历中value对象是一个的问题分析
一,问题发现
1,遍历reduce中的values参数 key值也会随之变化
假设有如下需求:有一批订单 一个订单有多个商品 现在需要根据订单id分组 在把一组订单中的商品按照价格排序
mapper代码
/**
* @ClassName Job13Mapper
* @Description 读取订单数据转换成Job13Data订单对象 和DoubleWritable订单金额 输出
* @Author liangfeng
* @Date 2019-06-28 22:53
* @Version 1.0
**/
public class Job13Mapper extends Mapper<LongWritable, Text, Job13Data, DoubleWritable> {
Job13Data job13Data;
DoubleWritable doubleWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
job13Data = new Job13Data();
doubleWritable = new DoubleWritable();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
job13Data.setOrderId(split[0]);//取订单id
job13Data.setPrice(Double.valueOf(split[2]));//取订单金额
System.out.println(job13Data.toString());
doubleWritable.set(job13Data.getPrice());
context.write(job13Data,doubleWritable);//
}
}
job13bean
/**
* @ClassName Job13Data
* @Description 订单bean 实现WritableComparable 实现根据price排序
* @Author liangfeng
* @Date 2019-06-28 22:59
* @Version 1.0
**/
public class Job13Data implements WritableComparable<Job13Data> {
private String orderId;
private Double price;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
/**
* @Description: 相同订单金额比较
* @Param null
* @return:
* @Author: liangfeng
* @Date: 2019-07-09 12:07
*/
@Override
public int compareTo(Job13Data o) {
if(orderId.equals(o.orderId)){
int i = this.price.compareTo(o.getPrice());
return -i;
}
return 0;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
/**
* @Description: 输出文本格式
* @Param null
* @return:
* @Author: liangfeng
* @Date: 2019-07-09 12:08
*/
@Override
public String toString() {
return orderId+"\t"+price;
}
}
Job13Partitioner 分区
/**
* @ClassName Job13Partitioner
* @Description 根据订单id分区
* @Author liangfeng
* @Date 2019-06-28 23:18
* @Version 1.0
**/
public class Job13Partitioner extends Partitioner<Job13Data, DoubleWritable> {
@Override
public int getPartition(Job13Data job13Data, DoubleWritable doubleWritable, int numPartitions) {
String orderId = job13Data.getOrderId();
System.out.println("orderid"+orderId);
return ((orderId.hashCode() & Integer.MAX_VALUE)%numPartitions);
}
}
Job13Group分组
/**
* @ClassName Job13Group
* @Description 根据订单id分组
* @Author liangfeng
* @Date 2019-06-28 23:22
* @Version 1.0
**/
public class Job13Group extends WritableComparator {
public Job13Group(){
super(Job13Data.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("compare");
Job13Data job13Data = (Job13Data) a;
Job13Data job13Data1 = (Job13Data) b;
System.out.println("compare"+job13Data.toString()+job13Data1.toString());
return job13Data.getOrderId().compareTo(job13Data1.getOrderId());
}
}
Job13Reduce reduce遍历输出结果
/**
* @ClassName Job13Reduce
* @Description reduce遍历结果输出Job13Data和price
* @Author liangfeng
* @Date 2019-06-28 23:34
* @Version 1.0
**/
public class Job13Reduce extends Reducer<Job13Data, DoubleWritable,Job13Data,DoubleWritable> {
@Override
protected void reduce(Job13Data key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (DoubleWritable value : values) {
if(i>1){
break;
}
System.out.println("value"+key.toString());
context.write(key,value); //重点这一句
i++;
}
}
}
输出结果 格式 key中的订单id key中订单金额 value值
key value
1 2 2
1 3 3
1 4 4
1 5 5
context.write(key,value); //这段代码
观察reduce中的遍历输出代码 其中key是reduce第一个参数传进来的 遍历values之后value值在变化那为什么key值也在变化呢?key不是一个对象吗也没新创建其他对象。
2,values 遍历的过程中value对象始终是同一个
需求:假设我们需要把reduce参数values的值遍历出来放到集合中进行排序或者取最大值之类的操作
观察如下reduce的输出代码 还是以job13bean为参数
/**
* @ClassName Job13Reduce
* @Description reduce遍历values把value存入集合中 然后输出
* @Author liangfeng
* @Date 2019-06-28 23:34
* @Version 1.0
**/
public class Job14Reduce extends Reducer<Job13Data, Job13Data,Job13Data, NullWritable> {
@Override
protected void reduce(Job13Data key, Iterable<Job13Data> values, Context context) throws IOException, InterruptedException {
List<Job13Data> data = new LinkedList<Job13Data>();
for (Job13Data value : values) {
data.add(value);
}
for (Job13Data datum : data) {
System.out.println(data);//输出值
//System.out.println(getClass().getName() + "@" + Integer.toHexString(hashCode()));//输出引用
}
}
}
输出结果
1 5
1 5
1 5
1 5
四条结果都是最后一个 观察输出的引用也是同一个 难道遍历的对象是同一个?
二,可能问题猜测和解决方向
针对第一个问题可能是在遍历values的时候把key的值给修改啦。
针对第二个问题可能是传递进来的Iterable做啦什么修改操作导致的。
查找方向:主要查找传递进来的Iterable查看源码分析其中是否对获取value和key进行啦特殊操作。是否遍历values的时候也修改啦key, value是否就一个对象遍历的时候进行啦重用
三,源码分析
reduce源码
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}
其中key和values是通过context对象获取的接着我们查看context的实现类
ReduceTask类中有个 createReduceContext()方法 创建context并传递到run方法里面去
createReduceContext(方法中new啦一个ReduceContextImpl类,接着我们具体看看ReduceContextImpl类
/**
* The context passed to the {@link Reducer}.
* @param <KEYIN> the class of the input keys
* @param <VALUEIN> the class of the input values
* @param <KEYOUT> the class of the output keys
* @param <VALUEOUT> the class of the output values
*/
public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private RawKeyValueIterator input;
private Counter inputValueCounter;
private Counter inputKeyCounter;
private RawComparator<KEYIN> comparator;
private KEYIN key; // current key
private VALUEIN value; // current value
private boolean firstValue = false; // first value in key
private boolean nextKeyIsSame = false; // more w/ this key
private boolean hasMore; // more in file
protected Progressable reporter;
private Deserializer<KEYIN> keyDeserializer;
private Deserializer<VALUEIN> valueDeserializer;
private DataInputBuffer buffer = new DataInputBuffer();
private BytesWritable currentRawKey = new BytesWritable();
private ValueIterable iterable = new ValueIterable();
private final SerializationFactory serializationFactory;
private final Class<KEYIN> keyClass;
private final Class<VALUEIN> valueClass;
private final Configuration conf;
private final TaskAttemptID taskid;
private int currentKeyLength = -1;
private int currentValueLength = -1;
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
//key的序列化工具类传递key的class字节码用于反序列化生成对象
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
//设置需要反序列化的buffer
this.keyDeserializer.open(buffer);
//value的序列化工具如上
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
/** Start processing next unique key. */
//该方法在开始遍历的时候调用如上reduce中的run方法
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue(); //此处调用啦一次nextKeyValue();
} else {
return false;
}
}
/**
* Advance to the next key/value pair.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame;
DataInputBuffer nextKey = input.getKey(); //获取key的DataInputBuffer
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
//key的数据的字节放入buffer中进行反序列化
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
//调用deserialize方法反序列化
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(),
nextVal.getLength() - nextVal.getPosition());
//调用deserialize方法对value进行反序列化
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}
public KEYIN getCurrentKey() {
return key;
}
@Override
public VALUEIN getCurrentValue() {
return value;
}
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
@Override
public boolean hasNext() {
return firstValue || nextKeyIsSame;
}
@Override
public VALUEIN next() {
// if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) {
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
//每次调用 nextKeyValue() 方法 给key和value 设置属性值 返回value
nextKeyValue();
return value;
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove not implemented");
}
/**
* This method is called to write the record that was most recently
* served (before a call to the mark). Since the framework reads one
* record in advance, to get this record, we serialize the current key
* and value
* @param out
* @throws IOException
*/
private void writeFirstKeyValueBytes(DataOutputStream out)
throws IOException {
assert (getCurrentKey() != null && getCurrentValue() != null);
WritableUtils.writeVInt(out, currentKeyLength);
WritableUtils.writeVInt(out, currentValueLength);
Serializer<KEYIN> keySerializer =
serializationFactory.getSerializer(keyClass);
keySerializer.open(out);
keySerializer.serialize(getCurrentKey());
Serializer<VALUEIN> valueSerializer =
serializationFactory.getSerializer(valueClass);
valueSerializer.open(out);
valueSerializer.serialize(getCurrentValue());
}
}
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator();
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}
}
/**
* Iterate through the values for the current key, reusing the same value
* object, which is stored in the context.
* @return the series of values associated with the current key. All of the
* objects returned directly and indirectly from this method are reused.
*/
public
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
}
观察上面代码 遍历操作主要是调研 nextKeyValue()方法
该方法中有个反序列化操作获取key和value对象
接着我们看看key和value 是如何获取的
value = valueDeserializer.deserialize(value);以这行代码为例子查看deserialize方法如下
@Override
public Writable deserialize(Writable w) throws IOException {
Writable writable;
//如果传入的value是null就根据class创建对象
if (w == null) {
writable
= (Writable) ReflectionUtils.newInstance(writableClass, getConf());//创建value对象
} else {
writable = w;
}
//调用对象的中实现的writable接口的readFields方法给对象属性赋值,dataIn是输入流
writable.readFields(dataIn);
return writable;
}
这段是问题的根本点所在,我们可以从中看到什么呢?
整个流程中创建对象的地方只有
** writable
= (Writable) ReflectionUtils.newInstance(writableClass, getConf());**
而且当传进来的value不为null的时候就不再创建啦,只给存在的value对象的属性赋值。
总结
hadoop只创建啦一个key和value对象
遍历过程不断给这个key和value属性赋值而且是key,value成对操作赋值
所以value永远只有一个
遍历value的时候key的属性值也做啦改变
四,避免问题的优化思路
1,避免第一个问题hadoop架构中 ValueIterator中泛型参数可以为keyset存放键值对 避免误解。
2,第二个问题hadoop应该是希望重用对象减少new对象造成的内存资源消耗,可以在Writable接口中添加一个获取新对象的方法给方便用户添加到集合中操作。