第五节 netty源码分析-ChannelPipeline的addLast方法解析

一、概述

在第四节中,分析了ServerBootstrap的bind()方法,主要分析了initAndRegister方法,这个方法调用的层次很深,还有几个主要的地方没有讲到,其中在initAndRegister()调用的ServerBootstrap.init()方法里面会用pipeline.addlast()方法。前面分析过new NioServerSocketChannel是默认创建DefaultChannelPipeline。所以就看io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler…)方法

二、ChannelPipeline 的addLast

 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            //executor=null
            //name =null
            //h=new ChannelInitializer<Channel> 上一节分析提到的特殊handler
            addLast(executor, null, h);
        }

        return this;
    }

addLast(executor, null, h);源码

     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        //group=null
        //name =null
        //handler=new ChannelInitializer<Channel>
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查添加
            checkMultiplicity(handler);

            //filterName(name, handler)   当我们没有指定名字时  给我们默认生成一个
            //new DefaultChannelHandlerContext()
            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                //判断handlerState属性等于0  并且设置为1
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            //返回NioEvenGroup
            EventExecutor executor = newCtx.executor();
            //如果当前线程跟eventLoop保存的线程变量不是同一个就进入callHandlerAddedInEventLoop方法
            //只做一件事情就是callHandlerAdded0(newCtx)
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
         
        callHandlerAdded0(newCtx);
        return this;
    }

callHandlerAdded0(newCtx) 方法里面很简单就是调用io.netty.channel.AbstractChannelHandlerContext#callHandlerAdded()这个方法也很简单就是调用一个

handlerAdded()因为分析到这里的handler是ChannelInitializer,所以就看ChannelInitializer对handlerAdded的实现

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            //对于当前的DefaultChannelPipeline实现,这应该是可以进来的
            //这里就调用的initChannel方法,如果initChannel方法执行完就会将ChannelInitializer
            //从pipeline中移除调
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            }
        }
    }

initChannel(ctx)

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            //我们写的ChannelInitializer.initChannel的方法 将在这里被调用
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            //删除此节点
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

所以就会到了ServerBoostrap.init()方法里面的定义的匿名内部类

   p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                //System.out.println(ch==channel);   true
                final ChannelPipeline pipeline = ch.pipeline();
                //System.out.println(pipeline==p);  true
                //config.handler()=自己创建的new ChannelInitializer<ServerSocketChannel>()
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                //这里的ch还是NioServerSocketChannel。从里面获取到bossGroup线程,然后启动线程
                //ch.eventLoop().execute()就是 SingleThreadEventExecutor.execute()
                //SingleThreadEventExecutor.execute()才会启动正在的启动线程去监听客户端的socket链接
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
//                        System.out.println("执行了");
                        //bossGroup将客户端连接转交给workerGroup
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

三、总结

ChannelInitializer是一个特殊的handler,pipeline在添加完之后会将这个handler移除掉。pipeline可以理解成是一个链表结构,里面有head头结点,tail尾结点。当有数据进来的时候,就会从头结点的handler开始处理。如果有数据写出去的话,就会从尾结点开始处理。


版权声明:本文为Resee_Z原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。