PooledByteBuf分配及回收之十六AdaptiveRecvByteBufAllocator源码解析

AdaptiveRecvByteBufAllocator 决定了要分配的ByteBuf的容量。

RecvByteBufAllocator

public interface RecvByteBufAllocator {
    
    //创建一个新的句柄。句柄提供实际操作并保留内部信息,这是预测最佳缓冲区容量所必需的。
    Handle newHandle();

    
    @Deprecated
    interface Handle {
        //创建一个新的接收缓冲区,其容量可能足够大以读取所有入站数据,而又很小以不浪费其空间。
        ByteBuf allocate(ByteBufAllocator alloc);

        //与{@link #allocate(ByteBufAllocator)}类似,除了它不分配任何内容,仅告诉容量。
        int guess();

        //重置所有已累积的计数器,并建议下一个读取循环应读取多少消息/字节。
        //{@link #continueReading()}可以使用它来确定读取操作是否应该完成。(这只是一个提示,可能会被实现忽略。)
        void reset(ChannelConfig config);

        //增加当前读取循环已读取的消息数。
        void incMessagesRead(int numMessages);

        //设置上一次读取操作已读取的字节。这可用于增加已读取的字节数。
        void lastBytesRead(int bytes);

        //获取上一次读取操作的字节数。
        int lastBytesRead();

        //设置读取操作将尝试读取或读取的字节数。
        void attemptedBytesRead(int bytes);

        //获取读取操作将(或确实)尝试读取的字节数。
        int attemptedBytesRead();

        //确定当前的读取循环是否应该继续。
        boolean continueReading();

        //读取已完成。
        void readComplete();
    }

    @SuppressWarnings("deprecation")
    @UnstableApi
    interface ExtendedHandle extends Handle {
        //与{@link Handle#continueReading()}相同,只是“更多数据”由供应商参数确定。
        boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier);

    }

}

 

MaxMessagesRecvByteBufAllocator

public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
    //返回每个读取循环要读取的最大消息数。
    int maxMessagesPerRead();
    //设置每个读取循环读取的最大消息数。如果此值大于1,则事件循环可能会尝试多次读取以获取多条消息。
    MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
    
}

 

DefaultMaxMessagesRecvByteBufAllocator


public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
    //默认为1
    private volatile int maxMessagesPerRead;
    //默认为true
    private volatile boolean respectMaybeMoreData = true;
    //构造方法
    public DefaultMaxMessagesRecvByteBufAllocator() {
        this(1);
    }
    //构造方法
    public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
        maxMessagesPerRead(maxMessagesPerRead);
    }

    @Override
    public int maxMessagesPerRead() {
        return maxMessagesPerRead;
    }

    @Override
    public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
        checkPositive(maxMessagesPerRead, "maxMessagesPerRead");
        this.maxMessagesPerRead = maxMessagesPerRead;
        return this;
    }

   

    public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        this.respectMaybeMoreData = respectMaybeMoreData;
        return this;
    }

    
    public final boolean respectMaybeMoreData() {
        return respectMaybeMoreData;
    }

}

静态内部类 MaxMessageHandle

    public abstract class MaxMessageHandle implements ExtendedHandle {

        private ChannelConfig config;
        //每次循环读取的最大消息数
        private int maxMessagePerRead;
        //循环读取的消息总数
        private int totalMessages;
        //循环读取的字节总数
        private int totalBytesRead;
        //每次循环期望读取的字节数
        private int attemptedBytesRead;
        //上次循环读取的字节数
        private int lastBytesRead;
        //默认为true
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;

        //一个Boolean提供商
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                //如果attemptedBytesRead == lastBytesRead 那么证明可能还有数据待读取
                return attemptedBytesRead == lastBytesRead;
            }
        };

        /**
         * 重置
         */
        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

        //分配一个读取的ByteBuf
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }

        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

        @Override
        public final int lastBytesRead() {
            return lastBytesRead;
        }

        @Override
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }
        
        //这里逻辑比较重要,决定了读取循环是否继续执行
        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

        @Override
        public void readComplete() {
        }

        @Override
        public int attemptedBytesRead() {
            return attemptedBytesRead;
        }

        @Override
        public void attemptedBytesRead(int bytes) {
            attemptedBytesRead = bytes;
        }

        protected final int totalBytesRead() {
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }
    }

 

AdaptiveRecvByteBufAllocator

静态属性

    //最小容量
    static final int DEFAULT_MINIMUM = 64;
    //初始容量
    static final int DEFAULT_INITIAL = 1024;
    //最大容量
    static final int DEFAULT_MAXIMUM = 65536;

    //SIZE_TABLE下标增量
    private static final int INDEX_INCREMENT = 4;
    //SIZE_TABLE下标减量
    private static final int INDEX_DECREMENT = 1;
    //容量数组 大小为53
    private static final int[] SIZE_TABLE;

SIZE_TABLE的初始化

    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        //元素均为16的倍数,共31个
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }
        // 512 = 1 << 9 , 直到 1 << 30 共22个 元素均为2的指数
        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

属性

    //计算后为3
    private final int minIndex;
    //计算后为38
    private final int maxIndex;
    //默认=DEFAULT_INITIAL=1024
    private final int initial;

构造方法

    public AdaptiveRecvByteBufAllocator() {
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        checkPositive(minimum, "minimum");
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }

        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }    

    //使用二分法查找合适的SIZE_TABLE下标
    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }
            //这里注意一下,+比>>>的优先级高
            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                //进入这里的条件是 a < size <= b
                return mid + 1;
            }
        }
    }

其它方法实现

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    @Override
    public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this;
    }

内部类HandleImpl


    private final class HandleImpl extends MaxMessageHandle {
        //最小容量下标 3 代表容量为64
        private final int minIndex;
        //最大容量下标 38 代表容量为65536
        private final int maxIndex;
        //初始容量下标 32 代表容量为1024
        private int index;
        //下一个接收缓冲区大小 初始值是1024
        private int nextReceiveBufferSize;
        //立即减少 初始值false
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

	    /**
	     * 如果我们阅读的内容与要求的一样多,则应检查是否需要增加下一个猜测的大小。
	     * 这有助于在挂起大量数据时更快地进行调整,并且可以避免返回选择器以检查更多数据。返回选择器可能会为大型数据传输增加大量的延迟。
	     */
        @Override
        public void lastBytesRead(int bytes) {
            
            if (bytes == attemptedBytesRead()) {
                record(bytes);
            }
            super.lastBytesRead(bytes);
        }

        @Override
        public int guess() {
            return nextReceiveBufferSize;
        }

        private void record(int actualReadBytes) {

            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

        @Override
        public void readComplete() {
            record(totalBytesRead());
        }
    }

 

AbstractNioByteChannel内部类NioByteUnsafe 使用 AdaptiveRecvByteBufAllocator

@Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();❶
            allocHandle.reset(config);❷

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);❸
                    allocHandle.lastBytesRead(
                    	doReadBytes(byteBuf)❹
                    );❺
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);❻
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);❼
                    byteBuf = null;
                } while (allocHandle.continueReading());❽

                allocHandle.readComplete();❾
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

❶ 获取一个RecvByteBufAllocator.Handle

        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            if (recvHandle == null) {
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }

        @Override
        public Handle newHandle() {
            return new HandleImpl(minIndex, maxIndex, initial);
        }

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

❷对RecvByteBufAllocator.Handle进行重置

        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            //maxMessagePerRead = 16
            maxMessagePerRead = maxMessagesPerRead();
            //清零
            totalMessages = totalBytesRead = 0;
        }

❸ 分配一个ByteBuf

        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }

        @Override
        public int guess() {
            //nextReceiveBufferSize默认为1024,在第❶步已进行初始化
            return nextReceiveBufferSize;
        }

❹读取数据到ByteBuf

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        //设置要读取的字节数量
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        //读取,并返回实际读取的字节数量
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

❺设置本次读取的字节数量

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

❻读取消息的数量+1

        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }

❼触发channelRead事件任务链

任务链处理完成后ByteBuf会被回收

❽判断是否继续读取

        @Override
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }

        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                return attemptedBytesRead == lastBytesRead;
            }
        };

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

 attemptedBytesRead == lastBytesRead 代表期望读取的字节数==实际读取的字节数,如果确实相等,则证明还需要继续读取数据

totalMessages < maxMessagePerRead  maxMessagePerRead的值初始化时为16。举例: 如果数据大小为17*1024个字节,那么整个读取操作循环16次,也就是总共读取16*1024个字节

❾读取完成

        @Override
        public void readComplete() {
            record(totalBytesRead());
        }
        //totalBytesRead是循环读取完成后,总的读取的字节数
        protected final int totalBytesRead() {
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }

        //设置下次读取的时候nextReceiveBufferSize的大小
        private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

这时候理解了为什么INDEX_INCREMENT=4  因为在 maxMessagePerRead=16=1<<4 ,所以当SIZE_TALBE[index]>=512时,上次的读取最多读取16*SIZE_TALBE[index]个字节,而SIZE_TALBE[index+INDEX_INCREMENT] = 16*SIZE_TALBE[index]

 

 

 


版权声明:本文为AnY11原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。