hadoop初读--写数据时的数据流管道双向机制

为了保证分布式数据的一致性和完整性,hadoop写数据流时使用了写数据和应答的双向机制.
这里写图片描述


这里着重说明的是反向应答其实是分为两部分:
1.写请求答应:
     正常情况下,这个应答会从管道的最后一个数据节点开始,往客户端方向发送,管道上的每一个节点都会等待这个应答,收到应答后,才会开始接受数据,也就是说,客户端会等待这个应答,然后才开始发送数据。这个应答是同步的,即直到收到应答后才会进行下一步。应答包的结构,只有两个字段:返回码和附加信息,当返回码是OP_STATUS_ERROR时,附件信息提供了流中第一个出错的数据节点地址信息.

2.写数据应答:
     客户端通过数据流管道发送数据,管道上的数据节点会在接受数据并写磁盘后,需要给上游节点发送确认包,以清除缓冲区的内容。确认包从最后一个数据节点发送,逆流而上,直达数据源。应答包对应的类是:
DataTransferProtocol.PipelineAck.


下面贴下略简代码,以注释方式说明几个本人以为的需要关注的点:

1.写请求应答处理的代码:

/**
* 1.当前节点的写请求应答处理
*/
if (targets.length > 0) {
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getName();
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        try {
          int timeoutValue = datanode.socketTimeout +
                             (HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
          int writeTimeout = datanode.socketWriteTimeout + 
                             (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
         mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
          mirrorOut = new DataOutputStream(
             new BufferedOutputStream(
                     NetUtils.getOutputStream(mirrorSock, writeTimeout),
                         SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
          mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
          mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
          mirrorOut.writeLong( block.getBlockId() );
          mirrorOut.writeLong( block.getGenerationStamp() );
          mirrorOut.writeInt( pipelineSize );
          mirrorOut.writeBoolean( isRecovery );
          Text.writeString( mirrorOut, client );
          mirrorOut.writeBoolean(hasSrcDataNode);
          if (hasSrcDataNode) { 
            srcDataNode.write(mirrorOut);
          }
          mirrorOut.writeInt( targets.length - 1 );
          for ( int i = 1; i < targets.length; i++ ) {
            targets[i].write( mirrorOut );
          }
          accessToken.write(mirrorOut);

          blockReceiver.writeChecksumHeader(mirrorOut);
          mirrorOut.flush();
          if (client.length() != 0) {
          /**
          * mirrorOut.flush();
          * 后同步等待应答mirrorIn.readShort()
          */
            mirrorInStatus = mirrorIn.readShort();
            firstBadLink = Text.readString(mirrorIn);
            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " + firstBadLink);
            }
          }

        } catch (IOException e) {
          /**
           * 在建立mirrorSock(mirrorOut和mirrorIn)时,如果出现IOException, 返回DataTransferProtocol.OP_STATUS_ERROR的返回码和包含第一个出错的数据节点信息的附加信息
           */
          /**
           * 这里的状态和mirrorNode才是真正对应当前数据节点的.
           */
          if (client.length() != 0) {
            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
            Text.writeString(replyOut, mirrorNode);
            replyOut.flush();
          }
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          if (client.length() > 0) {
            throw e;
          } else {
            LOG.info(datanode.dnRegistration + ":Exception transfering block " +block + " to mirror " + mirrorNode +". continuing without the mirror.\n" +          StringUtils.stringifyException(e));
          }
        }
      }
/**
* 2.下一个节点反馈的写请求应答处理
*/
if (client.length() != 0) {
        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
        }
        /**
         * 其实mirrorInStatus和firstBadLink是下一个数据节点的状态(是从mirrorIn里读取的,见388-390行代码)
         */
        replyOut.writeShort(mirrorInStatus);
        Text.writeString(replyOut, firstBadLink);
        replyOut.flush();
      }

2.写数据应答处理的代码:

/**
* 主方法在PacketResponder.run()
*/
.............
 short[] replies = null;
            if (mirrorError) { // no ack is read
                replies = new short[2];
                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
                replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
            } else {//构造成功应答
     /**
      * ack.getNumOfReplies():收集下游数据额节点处理结果
     */
            short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
                replies = new short[1+ackLen];
                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
                for (int i=0; i<ackLen; i++) {
                //加入下游应答
                    replies[i+1] = ack.getReply(i);
                }
            }
          PipelineAck replyAck = new       PipelineAck(expected, replies);
            replyAck.write(replyOut);//往上游发送应答
            replyOut.flush();
            if (LOG.isDebugEnabled()) {
              LOG.debug("PacketResponder " + numTargets +" for block " + block +" responded an ack: " + replyAck);
            }
 ...........

版权声明:本文为zhusirong原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。