为了保证分布式数据的一致性和完整性,hadoop写数据流时使用了写数据和应答的双向机制.
这里着重说明的是反向应答其实是分为两部分:
1.写请求答应:
正常情况下,这个应答会从管道的最后一个数据节点开始,往客户端方向发送,管道上的每一个节点都会等待这个应答,收到应答后,才会开始接受数据,也就是说,客户端会等待这个应答,然后才开始发送数据。这个应答是同步的,即直到收到应答后才会进行下一步。应答包的结构,只有两个字段:返回码和附加信息,当返回码是OP_STATUS_ERROR时,附件信息提供了流中第一个出错的数据节点地址信息.
2.写数据应答:
客户端通过数据流管道发送数据,管道上的数据节点会在接受数据并写磁盘后,需要给上游节点发送确认包,以清除缓冲区的内容。确认包从最后一个数据节点发送,逆流而上,直达数据源。应答包对应的类是:
DataTransferProtocol.PipelineAck.
下面贴下略简代码,以注释方式说明几个本人以为的需要关注的点:
1.写请求应答处理的代码:
/**
* 1.当前节点的写请求应答处理
*/
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
mirrorNode = targets[0].getName();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = datanode.socketTimeout +
(HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
int writeTimeout = datanode.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
mirrorOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(mirrorSock, writeTimeout),
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
mirrorOut.writeLong( block.getBlockId() );
mirrorOut.writeLong( block.getGenerationStamp() );
mirrorOut.writeInt( pipelineSize );
mirrorOut.writeBoolean( isRecovery );
Text.writeString( mirrorOut, client );
mirrorOut.writeBoolean(hasSrcDataNode);
if (hasSrcDataNode) {
srcDataNode.write(mirrorOut);
}
mirrorOut.writeInt( targets.length - 1 );
for ( int i = 1; i < targets.length; i++ ) {
targets[i].write( mirrorOut );
}
accessToken.write(mirrorOut);
blockReceiver.writeChecksumHeader(mirrorOut);
mirrorOut.flush();
if (client.length() != 0) {
/**
* mirrorOut.flush();
* 后同步等待应答mirrorIn.readShort()
*/
mirrorInStatus = mirrorIn.readShort();
firstBadLink = Text.readString(mirrorIn);
if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " + firstBadLink);
}
}
} catch (IOException e) {
/**
* 在建立mirrorSock(mirrorOut和mirrorIn)时,如果出现IOException, 返回DataTransferProtocol.OP_STATUS_ERROR的返回码和包含第一个出错的数据节点信息的附加信息
*/
/**
* 这里的状态和mirrorNode才是真正对应当前数据节点的.
*/
if (client.length() != 0) {
replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
Text.writeString(replyOut, mirrorNode);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (client.length() > 0) {
throw e;
} else {
LOG.info(datanode.dnRegistration + ":Exception transfering block " +block + " to mirror " + mirrorNode +". continuing without the mirror.\n" + StringUtils.stringifyException(e));
}
}
}
/**
* 2.下一个节点反馈的写请求应答处理
*/
if (client.length() != 0) {
if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
/**
* 其实mirrorInStatus和firstBadLink是下一个数据节点的状态(是从mirrorIn里读取的,见388-390行代码)
*/
replyOut.writeShort(mirrorInStatus);
Text.writeString(replyOut, firstBadLink);
replyOut.flush();
}
2.写数据应答处理的代码:
/**
* 主方法在PacketResponder.run()
*/
.............
short[] replies = null;
if (mirrorError) { // no ack is read
replies = new short[2];
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
} else {//构造成功应答
/**
* ack.getNumOfReplies():收集下游数据额节点处理结果
*/
short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
replies = new short[1+ackLen];
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
for (int i=0; i<ackLen; i++) {
//加入下游应答
replies[i+1] = ack.getReply(i);
}
}
PipelineAck replyAck = new PipelineAck(expected, replies);
replyAck.write(replyOut);//往上游发送应答
replyOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug("PacketResponder " + numTargets +" for block " + block +" responded an ack: " + replyAck);
}
...........
版权声明:本文为zhusirong原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。