hadoop中关于reduce参数values遍历导致key值变化和遍历中value对象是一个的问题源码分析

hadoop中关于reduce参数values遍历导致key值变化和遍历中value对象是一个的问题分析

一,问题发现

1,遍历reduce中的values参数 key值也会随之变化

假设有如下需求:有一批订单 一个订单有多个商品 现在需要根据订单id分组 在把一组订单中的商品按照价格排序

mapper代码

/**
 * @ClassName Job13Mapper
 * @Description 读取订单数据转换成Job13Data订单对象 和DoubleWritable订单金额 输出
 * @Author liangfeng
 * @Date 2019-06-28 22:53
 * @Version 1.0
 **/
public class Job13Mapper extends Mapper<LongWritable, Text, Job13Data, DoubleWritable> {

    Job13Data job13Data;
    DoubleWritable doubleWritable;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        job13Data = new Job13Data();
        doubleWritable = new DoubleWritable();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        job13Data.setOrderId(split[0]);//取订单id
        job13Data.setPrice(Double.valueOf(split[2]));//取订单金额
        System.out.println(job13Data.toString());
        doubleWritable.set(job13Data.getPrice());
        context.write(job13Data,doubleWritable);//
    }
}

job13bean

/**
 * @ClassName Job13Data
 * @Description 订单bean 实现WritableComparable 实现根据price排序
 * @Author liangfeng
 * @Date 2019-06-28 22:59
 * @Version 1.0
 **/
public class Job13Data implements WritableComparable<Job13Data> {

    private String orderId;
    private Double price;

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    /**
     * @Description: 相同订单金额比较
     * @Param null
     * @return:
     * @Author: liangfeng
     * @Date: 2019-07-09 12:07
     */
    @Override
    public int compareTo(Job13Data o) {
        if(orderId.equals(o.orderId)){
            int i = this.price.compareTo(o.getPrice());
            return -i;
        }
        return 0;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }

    /**
     * @Description: 输出文本格式
     * @Param null
     * @return:
     * @Author: liangfeng
     * @Date: 2019-07-09 12:08
     */

    @Override
    public String toString() {
        return orderId+"\t"+price;
    }
}

Job13Partitioner 分区

/**
 * @ClassName Job13Partitioner
 * @Description 根据订单id分区
 * @Author liangfeng
 * @Date 2019-06-28 23:18
 * @Version 1.0
 **/
public class Job13Partitioner extends Partitioner<Job13Data, DoubleWritable> {
    @Override
    public int getPartition(Job13Data job13Data, DoubleWritable doubleWritable, int numPartitions) {
        String orderId = job13Data.getOrderId();
        System.out.println("orderid"+orderId);
        return ((orderId.hashCode() & Integer.MAX_VALUE)%numPartitions);
    }
}

Job13Group分组

/**
 * @ClassName Job13Group
 * @Description 根据订单id分组
 * @Author liangfeng
 * @Date 2019-06-28 23:22
 * @Version 1.0
 **/
public class Job13Group extends WritableComparator {

    public Job13Group(){
        super(Job13Data.class,true);
    }


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("compare");
        Job13Data job13Data = (Job13Data) a;
        Job13Data job13Data1 = (Job13Data) b;
        System.out.println("compare"+job13Data.toString()+job13Data1.toString());
        return job13Data.getOrderId().compareTo(job13Data1.getOrderId());
    }
}

Job13Reduce reduce遍历输出结果

/**
 * @ClassName Job13Reduce
 * @Description reduce遍历结果输出Job13Data和price
 * @Author liangfeng
 * @Date 2019-06-28 23:34
 * @Version 1.0
 **/
public class Job13Reduce extends Reducer<Job13Data, DoubleWritable,Job13Data,DoubleWritable> {


    @Override
    protected void reduce(Job13Data key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {

        int i = 0;
        for (DoubleWritable value : values) {

            if(i>1){
                break;
            }
            System.out.println("value"+key.toString());
            context.write(key,value); //重点这一句
            i++;
        }
    }
}

输出结果 格式 key中的订单id key中订单金额 value值
key value
1 2 2
1 3 3
1 4 4
1 5 5

context.write(key,value); //这段代码
观察reduce中的遍历输出代码 其中key是reduce第一个参数传进来的 遍历values之后value值在变化那为什么key值也在变化呢?key不是一个对象吗也没新创建其他对象。

2,values 遍历的过程中value对象始终是同一个

需求:假设我们需要把reduce参数values的值遍历出来放到集合中进行排序或者取最大值之类的操作
观察如下reduce的输出代码 还是以job13bean为参数

/**
 * @ClassName Job13Reduce
 * @Description reduce遍历values把value存入集合中 然后输出
 * @Author liangfeng
 * @Date 2019-06-28 23:34
 * @Version 1.0
 **/
public class Job14Reduce extends Reducer<Job13Data, Job13Data,Job13Data, NullWritable> {

    @Override
    protected void reduce(Job13Data key, Iterable<Job13Data> values, Context context) throws IOException, InterruptedException {
        List<Job13Data> data = new LinkedList<Job13Data>();

        for (Job13Data value : values) {
            data.add(value);
        }
        for (Job13Data datum : data) {
            System.out.println(data);//输出值
            //System.out.println(getClass().getName() + "@" + Integer.toHexString(hashCode()));//输出引用
        }
    }
}

输出结果
1 5
1 5
1 5
1 5

四条结果都是最后一个 观察输出的引用也是同一个 难道遍历的对象是同一个?

二,可能问题猜测和解决方向

针对第一个问题可能是在遍历values的时候把key的值给修改啦。
针对第二个问题可能是传递进来的Iterable做啦什么修改操作导致的。
查找方向:主要查找传递进来的Iterable查看源码分析其中是否对获取value和key进行啦特殊操作。是否遍历values的时候也修改啦key, value是否就一个对象遍历的时候进行啦重用

三,源码分析

reduce源码

    /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
    }
    cleanup(context);
  }

其中key和values是通过context对象获取的接着我们查看context的实现类
ReduceTask类中有个 createReduceContext()方法 创建context并传递到run方法里面去
createReduceContext(方法中new啦一个ReduceContextImpl类,接着我们具体看看ReduceContextImpl类

/**
 * The context passed to the {@link Reducer}.
 * @param <KEYIN> the class of the input keys
 * @param <VALUEIN> the class of the input values
 * @param <KEYOUT> the class of the output keys
 * @param <VALUEOUT> the class of the output values
 */
public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
    implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  private RawKeyValueIterator input;
  private Counter inputValueCounter;
  private Counter inputKeyCounter;
  private RawComparator<KEYIN> comparator;
  private KEYIN key;                                  // current key
  private VALUEIN value;                              // current value
  private boolean firstValue = false;                 // first value in key
  private boolean nextKeyIsSame = false;              // more w/ this key
  private boolean hasMore;                            // more in file
  protected Progressable reporter;
  private Deserializer<KEYIN> keyDeserializer;
  private Deserializer<VALUEIN> valueDeserializer;
  private DataInputBuffer buffer = new DataInputBuffer();
  private BytesWritable currentRawKey = new BytesWritable();
  private ValueIterable iterable = new ValueIterable();
  private final SerializationFactory serializationFactory;
  private final Class<KEYIN> keyClass;
  private final Class<VALUEIN> valueClass;
  private final Configuration conf;
  private final TaskAttemptID taskid;
  private int currentKeyLength = -1;
  private int currentValueLength = -1;
  
  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                           RawKeyValueIterator input, 
                           Counter inputKeyCounter,
                           Counter inputValueCounter,
                           RecordWriter<KEYOUT,VALUEOUT> output,
                           OutputCommitter committer,
                           StatusReporter reporter,
                           RawComparator<KEYIN> comparator,
                           Class<KEYIN> keyClass,
                           Class<VALUEIN> valueClass
                          ) throws InterruptedException, IOException{
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    //key的序列化工具类传递key的class字节码用于反序列化生成对象
    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    //设置需要反序列化的buffer
    this.keyDeserializer.open(buffer);
    //value的序列化工具如上
    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(buffer);
    hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
  }

  /** Start processing next unique key. */ 
  //该方法在开始遍历的时候调用如上reduce中的run方法
  public boolean nextKey() throws IOException,InterruptedException {
    while (hasMore && nextKeyIsSame) {
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue(); //此处调用啦一次nextKeyValue();
    } else {
      return false;
    }
  }

  /**
   * Advance to the next key/value pair.
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    DataInputBuffer nextKey = input.getKey(); //获取key的DataInputBuffer
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    //key的数据的字节放入buffer中进行反序列化
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    //调用deserialize方法反序列化
    key = keyDeserializer.deserialize(key);
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(),
        nextVal.getLength() - nextVal.getPosition());
     //调用deserialize方法对value进行反序列化
    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    hasMore = input.next();
    if (hasMore) {
      nextKey = input.getKey();
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
  }

  public KEYIN getCurrentKey() {
    return key;
  }

  @Override
  public VALUEIN getCurrentValue() {
    return value;
  }
  
  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {

    @Override
    public boolean hasNext() {
      return firstValue || nextKeyIsSame;
    }

    @Override
    public VALUEIN next() {

      // if this is the first record, we don't need to advance
      if (firstValue) {
        firstValue = false;
        return value;
      }
      // if this isn't the first record and the next key is different, they
      // can't advance it here.
      if (!nextKeyIsSame) {
        throw new NoSuchElementException("iterate past last value");
      }
      // otherwise, go to the next key/value pair
      try {
      //每次调用 nextKeyValue() 方法 给key和value 设置属性值 返回value
        nextKeyValue();
        return value;
      } catch (IOException ie) {
        throw new RuntimeException("next value iterator failed", ie);
      } catch (InterruptedException ie) {
        // this is bad, but we can't modify the exception list of java.util
        throw new RuntimeException("next value iterator interrupted", ie);        
      }
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("remove not implemented");
    }

    /**
     * This method is called to write the record that was most recently
     * served (before a call to the mark). Since the framework reads one
     * record in advance, to get this record, we serialize the current key
     * and value
     * @param out
     * @throws IOException
     */
    private void writeFirstKeyValueBytes(DataOutputStream out) 
    throws IOException {
      assert (getCurrentKey() != null && getCurrentValue() != null);
      WritableUtils.writeVInt(out, currentKeyLength);
      WritableUtils.writeVInt(out, currentValueLength);
      Serializer<KEYIN> keySerializer = 
        serializationFactory.getSerializer(keyClass);
      keySerializer.open(out);
      keySerializer.serialize(getCurrentKey());

      Serializer<VALUEIN> valueSerializer = 
        serializationFactory.getSerializer(valueClass);
      valueSerializer.open(out);
      valueSerializer.serialize(getCurrentValue());
    }
  }

  protected class ValueIterable implements Iterable<VALUEIN> {
    private ValueIterator iterator = new ValueIterator();
    @Override
    public Iterator<VALUEIN> iterator() {
      return iterator;
    } 
  }
  
  /**
   * Iterate through the values for the current key, reusing the same value 
   * object, which is stored in the context.
   * @return the series of values associated with the current key. All of the 
   * objects returned directly and indirectly from this method are reused.
   */
  public 
  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
    return iterable;
  }
}

观察上面代码 遍历操作主要是调研 nextKeyValue()方法
该方法中有个反序列化操作获取key和value对象
接着我们看看key和value 是如何获取的
value = valueDeserializer.deserialize(value);以这行代码为例子查看deserialize方法如下

    
    @Override
    public Writable deserialize(Writable w) throws IOException {
      Writable writable;
      //如果传入的value是null就根据class创建对象
      if (w == null) {
        writable 
          = (Writable) ReflectionUtils.newInstance(writableClass, getConf());//创建value对象
      } else {
        writable = w;
      }
      //调用对象的中实现的writable接口的readFields方法给对象属性赋值,dataIn是输入流
      writable.readFields(dataIn);
      return writable;
    }

这段是问题的根本点所在,我们可以从中看到什么呢?
整个流程中创建对象的地方只有
** writable
= (Writable) ReflectionUtils.newInstance(writableClass, getConf());**
而且当传进来的value不为null的时候就不再创建啦,只给存在的value对象的属性赋值。
总结
hadoop只创建啦一个key和value对象
遍历过程不断给这个key和value属性赋值而且是key,value成对操作赋值
所以value永远只有一个
遍历value的时候key的属性值也做啦改变

四,避免问题的优化思路

1,避免第一个问题hadoop架构中 ValueIterator中泛型参数可以为keyset存放键值对 避免误解。
2,第二个问题hadoop应该是希望重用对象减少new对象造成的内存资源消耗,可以在Writable接口中添加一个获取新对象的方法给方便用户添加到集合中操作。


版权声明:本文为u013052725原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。