AdaptiveRecvByteBufAllocator 决定了要分配的ByteBuf的容量。
RecvByteBufAllocator
public interface RecvByteBufAllocator { //创建一个新的句柄。句柄提供实际操作并保留内部信息,这是预测最佳缓冲区容量所必需的。 Handle newHandle(); @Deprecated interface Handle { //创建一个新的接收缓冲区,其容量可能足够大以读取所有入站数据,而又很小以不浪费其空间。 ByteBuf allocate(ByteBufAllocator alloc); //与{@link #allocate(ByteBufAllocator)}类似,除了它不分配任何内容,仅告诉容量。 int guess(); //重置所有已累积的计数器,并建议下一个读取循环应读取多少消息/字节。 //{@link #continueReading()}可以使用它来确定读取操作是否应该完成。(这只是一个提示,可能会被实现忽略。) void reset(ChannelConfig config); //增加当前读取循环已读取的消息数。 void incMessagesRead(int numMessages); //设置上一次读取操作已读取的字节。这可用于增加已读取的字节数。 void lastBytesRead(int bytes); //获取上一次读取操作的字节数。 int lastBytesRead(); //设置读取操作将尝试读取或读取的字节数。 void attemptedBytesRead(int bytes); //获取读取操作将(或确实)尝试读取的字节数。 int attemptedBytesRead(); //确定当前的读取循环是否应该继续。 boolean continueReading(); //读取已完成。 void readComplete(); } @SuppressWarnings("deprecation") @UnstableApi interface ExtendedHandle extends Handle { //与{@link Handle#continueReading()}相同,只是“更多数据”由供应商参数确定。 boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier); } }
MaxMessagesRecvByteBufAllocator
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator { //返回每个读取循环要读取的最大消息数。 int maxMessagesPerRead(); //设置每个读取循环读取的最大消息数。如果此值大于1,则事件循环可能会尝试多次读取以获取多条消息。 MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead); }
DefaultMaxMessagesRecvByteBufAllocator
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator { //默认为1 private volatile int maxMessagesPerRead; //默认为true private volatile boolean respectMaybeMoreData = true; //构造方法 public DefaultMaxMessagesRecvByteBufAllocator() { this(1); } //构造方法 public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) { maxMessagesPerRead(maxMessagesPerRead); } @Override public int maxMessagesPerRead() { return maxMessagesPerRead; } @Override public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) { checkPositive(maxMessagesPerRead, "maxMessagesPerRead"); this.maxMessagesPerRead = maxMessagesPerRead; return this; } public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) { this.respectMaybeMoreData = respectMaybeMoreData; return this; } public final boolean respectMaybeMoreData() { return respectMaybeMoreData; } }静态内部类 MaxMessageHandle
public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; //每次循环读取的最大消息数 private int maxMessagePerRead; //循环读取的消息总数 private int totalMessages; //循环读取的字节总数 private int totalBytesRead; //每次循环期望读取的字节数 private int attemptedBytesRead; //上次循环读取的字节数 private int lastBytesRead; //默认为true private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData; //一个Boolean提供商 private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { //如果attemptedBytesRead == lastBytesRead 那么证明可能还有数据待读取 return attemptedBytesRead == lastBytesRead; } }; /** * 重置 */ @Override public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; } //分配一个读取的ByteBuf @Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess()); } @Override public final void incMessagesRead(int amt) { totalMessages += amt; } @Override public void lastBytesRead(int bytes) { lastBytesRead = bytes; if (bytes > 0) { totalBytesRead += bytes; } } @Override public final int lastBytesRead() { return lastBytesRead; } @Override public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } //这里逻辑比较重要,决定了读取循环是否继续执行 @Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; } @Override public void readComplete() { } @Override public int attemptedBytesRead() { return attemptedBytesRead; } @Override public void attemptedBytesRead(int bytes) { attemptedBytesRead = bytes; } protected final int totalBytesRead() { return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead; } }
AdaptiveRecvByteBufAllocator
静态属性
//最小容量 static final int DEFAULT_MINIMUM = 64; //初始容量 static final int DEFAULT_INITIAL = 1024; //最大容量 static final int DEFAULT_MAXIMUM = 65536; //SIZE_TABLE下标增量 private static final int INDEX_INCREMENT = 4; //SIZE_TABLE下标减量 private static final int INDEX_DECREMENT = 1; //容量数组 大小为53 private static final int[] SIZE_TABLE;SIZE_TABLE的初始化
static { List<Integer> sizeTable = new ArrayList<Integer>(); //元素均为16的倍数,共31个 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } // 512 = 1 << 9 , 直到 1 << 30 共22个 元素均为2的指数 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }属性
//计算后为3 private final int minIndex; //计算后为38 private final int maxIndex; //默认=DEFAULT_INITIAL=1024 private final int initial;构造方法
public AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { checkPositive(minimum, "minimum"); if (initial < minimum) { throw new IllegalArgumentException("initial: " + initial); } if (maximum < initial) { throw new IllegalArgumentException("maximum: " + maximum); } int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; } //使用二分法查找合适的SIZE_TABLE下标 private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } //这里注意一下,+比>>>的优先级高 int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { //进入这里的条件是 a < size <= b return mid + 1; } } }其它方法实现
@Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } @Override public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) { super.respectMaybeMoreData(respectMaybeMoreData); return this; }内部类HandleImpl
private final class HandleImpl extends MaxMessageHandle { //最小容量下标 3 代表容量为64 private final int minIndex; //最大容量下标 38 代表容量为65536 private final int maxIndex; //初始容量下标 32 代表容量为1024 private int index; //下一个接收缓冲区大小 初始值是1024 private int nextReceiveBufferSize; //立即减少 初始值false private boolean decreaseNow; HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; } /** * 如果我们阅读的内容与要求的一样多,则应检查是否需要增加下一个猜测的大小。 * 这有助于在挂起大量数据时更快地进行调整,并且可以避免返回选择器以检查更多数据。返回选择器可能会为大型数据传输增加大量的延迟。 */ @Override public void lastBytesRead(int bytes) { if (bytes == attemptedBytesRead()) { record(bytes); } super.lastBytesRead(bytes); } @Override public int guess() { return nextReceiveBufferSize; } private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } } @Override public void readComplete() { record(totalBytesRead()); } }
AbstractNioByteChannel内部类NioByteUnsafe 使用 AdaptiveRecvByteBufAllocator
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();❶ allocHandle.reset(config);❷ ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator);❸ allocHandle.lastBytesRead( doReadBytes(byteBuf)❹ );❺ if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1);❻ readPending = false; pipeline.fireChannelRead(byteBuf);❼ byteBuf = null; } while (allocHandle.continueReading());❽ allocHandle.readComplete();❾ pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }❶ 获取一个RecvByteBufAllocator.Handle
@Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; } @Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }❷对RecvByteBufAllocator.Handle进行重置
@Override public void reset(ChannelConfig config) { this.config = config; //maxMessagePerRead = 16 maxMessagePerRead = maxMessagesPerRead(); //清零 totalMessages = totalBytesRead = 0; }❸ 分配一个ByteBuf
@Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess()); } @Override public int guess() { //nextReceiveBufferSize默认为1024,在第❶步已进行初始化 return nextReceiveBufferSize; }❹读取数据到ByteBuf
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置要读取的字节数量 allocHandle.attemptedBytesRead(byteBuf.writableBytes()); //读取,并返回实际读取的字节数量 return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }❺设置本次读取的字节数量
@Override public void lastBytesRead(int bytes) { lastBytesRead = bytes; if (bytes > 0) { totalBytesRead += bytes; } }❻读取消息的数量+1
@Override public final void incMessagesRead(int amt) { totalMessages += amt; }❼触发channelRead事件任务链
任务链处理完成后ByteBuf会被回收
❽判断是否继续读取
@Override public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { return attemptedBytesRead == lastBytesRead; } }; @Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; }attemptedBytesRead == lastBytesRead 代表期望读取的字节数==实际读取的字节数,如果确实相等,则证明还需要继续读取数据
totalMessages < maxMessagePerRead maxMessagePerRead的值初始化时为16。举例: 如果数据大小为17*1024个字节,那么整个读取操作循环16次,也就是总共读取16*1024个字节
❾读取完成
@Override public void readComplete() { record(totalBytesRead()); } //totalBytesRead是循环读取完成后,总的读取的字节数 protected final int totalBytesRead() { return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead; } //设置下次读取的时候nextReceiveBufferSize的大小 private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }这时候理解了为什么INDEX_INCREMENT=4 因为在 maxMessagePerRead=16=1<<4 ,所以当SIZE_TALBE[index]>=512时,上次的读取最多读取16*SIZE_TALBE[index]个字节,而SIZE_TALBE[index+INDEX_INCREMENT] = 16*SIZE_TALBE[index]
