Netty 读写请求源码分析

一、ServerBootstrap 启动过程

1. 创建线程模型

该步骤也就是创建 Netty 主从 Reactor 线程模型对应的线程池 NioEventLoopGroup,一个为主 Reactor ,一般设置为一个线程,处理连接请求,一个为从 Reactor 根据业务需求设置线程数,处理 IO任务并发回给客户端,具体初识话源码可以看我另一篇博客,线程池的初始化包括了初始化线程,开启 Selector并注册到线程上,优化底层数据结构等待。

2. doBind() 绑定端口并做一些初始化

该步基本是 Netty 服务器端启动的最后一步,绑定端口并做一些初始化操作,大致源码如下所示,主要包括两个方法:

  • initAndRegister():创建并绑定 channel 到对应的 Selector 上;
  • doBind0():绑定端口并且设置监听连接事件;
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化 channel 并注册到对应的 NioEventLoop 线程上
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        // 判断注册是否发生异常
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 如果注册完成
        if (regFuture.isDone()) {
            // 创建一个异步任务去绑定端口
            ChannelPromise promise = channel.newPromise();
            // 绑定端口以及之后的一些操作
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // 因为注册 channl 通过 future 的异步任务来实现所以没有成功要即时传递给主线程
            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
            // 添加一个注册的监听器,监听注册任务
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    // 如果失败响应主线程
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        // 成功则进入绑定逻辑
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

2.1 initAndRegister()

通过反射机制创建一个 channel,使用反射是为了可以提高扩展性,创建适合的 channel 。创建完成后赋值给 AbstractNioChannel 的 ch 属性,将 Netty 的 Channel 和 NIO 的channel 整合。然后进行初始化,设置上下文和一些参数,成功之后就可以进行注册,调用 AbtractNioChannel 中 unsafe 的 doRegister 进行注册,注册到对应的 Selector 选择器上,返回一个 SelectionKey,来保存各种参数,例如,感兴趣的事件集合,准备就绪的事件集合,对应的channel Selector 等。

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            /**
             * 通过反射创建一个 channel 对象,目的是为了可以为开发者可以定制自己希望使用的 channel
             * constructor.newInstance();
             * 并赋值给 AbstractNioChaneel 的 SelectableChannel ch; 属性
             */
            channel = channelFactory.newChannel();
            /**
             * 初始化创建的 channel
             * 包括设置 options 参数信息,例如使用长连接,禁用 Nagle 等
             * 还有放置 attr 上下文信息
             */
            init(channel);
        } catch (Throwable t) {
            // 如果初始化失败,创建一个异步任务返回异常
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        /**
         * 将创建的 Channel 注册到对应线程的选择器上,
         * 最终会经过层层封装调用 AbstractNioChannel 中的 unsafe 类的 doRegister 方法
         * 将 channel 注册到选择器上并返回一个 selectionKey
         */
        ChannelFuture regFuture = config().group().register(channel);
        // 注册异常的处理
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        /**
         * 如果你自己看源码,会发现这里有一段很长的注释,大致意思是:
         *  (1)在注册之后可以安全的调用 bind() 和 connect() 进行绑定和连接
         *  (2)这里会涉及线程的切换,ServerBootStrap 运行在主线程,
         *      而 register() 运行在 EventLoop 线程
         */
        return regFuture;
    }

2.2 doBind0()

会调用 dobind0 绑定对应的端口号和主机。绑定成功之后就会注册感兴趣的OP_ACCEPT 连接事件,接受客户端发起的连接请求。

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    // 通过eventLoop 线程去执行绑定 localAddress 的任务
                    // 并添加一个监听器监听错误关闭
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
			// 触发 active 事件,也就是监听 OP_ACCEPT 连接事件
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

二、Netty 对于 IO 事件的处理

1. OP_ACCEPT 连接事件

Netty 服务端对于连接请求的处理主要分为以下三步:

  • 当 NioEventLoop 的 Selector 轮询到有就绪的 IO 事件时,会检查类型是否是 OP_ACCEPT ,如果是,会取出对应的 attachment 附件,也就是对应的 NioServerSocketChannel 本身,调用它的内部辅助类 unsafe 来进行一个数据的读取操作‘’
  • 读取到数据之后,会调用 ServerSocketChannel 的 accept() 方法建立连接并且创建一个 NioSocketChannel 对象传送到 pipline 中进行处理。
  • 在 pipline 中其中有一个 handler 就是 ServerBootstrapAcceptor,该 Acceptor 会将传递来的 NioSocketChannel 注册到从 Reactor 线程池中的某一个 EventLoop 的Selector 选择器上,并且为添加感兴趣事件以及其绑定 handler 执行链。

2. OP_READ 读取事件

对于读取操作的流程,大致分为两步:

  • 选择器轮询到READ操作的 SelectionKey ,取出对应的附件,利用辅助类 unsafe 的 read 操作,循环读入数据到 ByteBuf 中;
  • 然后放入到 pipline 中通过 handler 进行处理,经过解码和一些列 handler 的处理,得到对应的可读取的 ByteBuf ;

对于第一步的循环读取,在另一篇博客里已经详细说明了,这一次就来说在 pipline 的解码操作,用户选择的解码器,除了使用 MessageToMessageCodec 的子类外,都会经过 ByteToMessageDecoder 抽象类的 channelRead() 方法,以下就来解读一番。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            // 创建一个解码后的消息列表
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                // 将传递的数据转型为 ByteBuf
                ByteBuf data = (ByteBuf) msg;
                // 判断是否是第一次解码操作
                first = cumulation == null;
                if (first) {
                    // 是的话,将数据直接赋值给堆积容器
                    cumulation = data;
                } else {
                    // 如果不是,则需要将数据写入堆积容器
                    // 需要维护读写指针,判断扩容等
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                /**
                 * 具体的解码操作,循环对可读数据进行解码,
                 * 期间会有一些合法性判断,如果不合法则停止操作,
                 * 主要的解码操作在:
                 *  decodeRemovalReentryProtection(ctx, in, out); 方法中的
                 *      decode(ctx, in, out); 方法,由子类重写来进行解码操作
                 */
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);

            } finally {// 解码完的收尾操作
                // 如果字节堆积容器不为空,并且不可读,则需要释放资源
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                    // 如果读取的次数,大于配置的次数,则需要继续读取
                } else if (++ numReads >= discardAfterReads) {
                    // 设置读取次数为 0
                    numReads = 0;
                    // 清除掉一些读取过的字节,
                    // 防止一直读入造成 OOM
                    discardSomeReadBytes();
                }
                int size = out.size();
                firedChannelRead |= out.insertSinceRecycled();
                // pipline 放行到下一个 handler
                fireChannelRead(ctx, out, size);
                // 回收消息集合
                out.recycle();
            }
        } else {
            // 如果不是 ByteBuf 类型,则直接放行
            ctx.fireChannelRead(msg);
        }
    }

3. OP_READ 读取事件(二)

既然读取到了对应的数据,一般情况要对于请求进行响应,写出客户端需要结果,这一节就说明读取数据之后,如何写回数据给客户端。

Netty 在进行数据的写操作时,为不阻塞运行,会包装为一个 writeTask 或者 writeAndFlushTask 放入到任务队列中,由线程去进行完成。

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 一系列合法性判断
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        // 获取对应的输出 handler
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        // 得到对应处理线程
        EventExecutor executor = next.executor();
        // 如果线程是 EventLoop 线程,则执行写操作,通过 flush 标志判断写完是否刷新
        if (executor.inEventLoop()) {
            /**
             * 对于真正的写数据操作
             * 就是先经过 encode 编码得到编码后的字节数据
             * 然后写入到 ByteBuf 字节容器中
             * 最后会调用到 NioSocketChannel 的 doWrite() 方法进行写入与刷新
             */
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            // 如果不是 EventLoop 线程
            // 则封装为一个写操作任务
            final AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
            // 将该任务加入到线程组的任务队列中
            if (!safeExecute(executor, task, promise, m, !flush)) {
                // 如果加入失败,则取消该任务,类似拒绝策略
                task.cancel();
            }
        }
    }

版权声明:本文为weixin_48922154原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。