前言
阅读netty源码之前建议先了解下netty相关的线程模型,推荐Doug Lea的Scalable IO in Java。并且熟悉java NIO的基本使用。
启动流程
下面是一个常见的netty启动方法:
public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new ServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
首先创建两个循环线程组:bossGroup和workerGroup 。这是netty中两个执行任务的循环线程组,bossGroup负责处理accept事件,workerGroup负责处理read\write事件。这里bossGroup为单线程,也是通常的做法,除非你想开启多个netty监听端口;workerGroup一般会创建多个线程来处理事件。循环线程组中的每个线程(EventLoop)内部都会有自己的多路复用器Selector,bossGroup内部的Selector处理客户端连接事件,然后把客户端连接过来的socket注册到workerGroup中某个线程的Selector上,并绑定read\write事件。
下面先看EventLoop的创建过程:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
//这里会调用一些列构造方法以及父类的构造方法,最后来到父类MultithreadEventExecutorGroup的构造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//事件执行器,对应循环线程组中每一个循环线程,可以理解为真正干活的线程,这里为一个数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//依次为数组中的每个执行器创建循环线程
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//创建失败则关闭之前创建的线程
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//创建循环线程组中的线程选择器,可以理解为负载均衡,netty会轮询选择group中的一个线程执行任务,当线程数为2的次幂和不是2的次幂是使用不同的策略,主要因为是2的次幂时可以用按位与替代取模
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}这里主要看 children[i] = newChild(executor, args); 这个核心方法,一路调用来到
NioEventLoopGroup
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//这里的SelectorProvider用来创建多路复用器Selector,因操作系统而易,linux对应epoll
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}可以看到这里的循环线程为NioEventLoop,这个循环线程是EventLoopGroup中执行任务的核心类,netty主要的逻辑都在这个类及其父类中。
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
看一下类图:

可以看到这个类继承了SingleThreadEventLoop,说明这是一个单线程的EventLoop,这也是netty高性能的一点,一个线程循环执行任务,没有上下文切换和资源竞争,类比redis的单线程模型。不过一般中间件在集成netty的时候都把这里的EventLoop当作是IO线程使用,EventLoop的handler接收客户端的读写事件,然后把任务放到业务线程池中执行。顶级接口为Executor,说明它可以像线程池一样工作,当然这是一个单线程的线程池。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//创建多路复用器Selector
final SelectorTuple selectorTuple = openSelector();
//优化了的Selector,其实是对NIO Selector的包装,默认不会包装
selector = selectorTuple.selector;
//unwrappedSelector 为NIO原生selector
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}这里还会调用数个父类的构造方法,看一个主要的:
SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
//队列中最大等待任务数,默认为Integer.MAX
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//每个NIOEventLoop对应的执行任务的执行器
this.executor = ThreadExecutorMap.apply(executor, this);
//内部阻塞队列,用来接收非IO任务,如注册channel
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
//看一下执行器的创建
ThreadExecutorMap
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
//其实就是做了一次包装,NIOEventLoop的执行器在执行execute方法时会把任务放到队列中,并开启当前循环线程,也就是开始netty主流程,即执行SingleThreadEventExecutor的run方法。bossGroup和workerGroup创建完毕,接下来看ServerBootstrap创建和初始化:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父类方法,把bossGroup赋值给ServerBootstrap的group
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
//把workerGroup赋值给childGroup
this.childGroup = childGroup;
return this;
}public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}这里的channel(NioServerSocketChannel.class)方法,通过参数class类拿到对应SocketChannel的构造方法,便于后面为ServerBootstrap创建channel。服务端对应的当然是NioServerSocketChannel。
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}childHandler方法表示的是workerGroup中的channel的handler,也就是处理读写事件的handler。
最后:
bootstrap.bind(8080) //绑定端口,并开启netty服务
下面进入netty启动的主流程:
直接看doBind()方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}这里几乎全部逻辑都在第一行代码,下面主要分析initAndRegister():
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//创建nioServerSocketChannel
channel = channelFactory.newChannel();
//channel的初始化工作
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//注册channel,即把channel注册到多路复用器Selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}这里有三个核心方法:
channelFactory.newChannel()
init(channel)
config().group().register(channel)
下面依次分析这三个方法:
channelFactory.newChannel()拿到之前缓存的构造方法创建NioServerSocketChannel:
public NioServerSocketChannel() {
//newSocket创建nio原生channel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
//父类
super(null, channel, SelectionKey.OP_ACCEPT);
//channel相关配置属性,里面维护了jdk的nio channel
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//最后到达
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
//netty自己实现的unsafe类,主要用来操作直接内存,注册channel等
unsafe = newUnsafe();
//创建ChannelPipeline
pipeline = newChannelPipeline();
}后面还会继续调用父类构造方法,这里详细看了,主要就是创建了每条channel对应的ChannelPipeline,并且设置socketChannel对ACCEPT事件感兴趣。
首先看下两个类的概念:
【ChannelHandlerContext】:
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
【ChannelPipline】:
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应。
盗个图吧哈哈:

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。
创建channel之后,接下来看初始化操作init(channel):
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
/*以上操作主要时一些赋值操作,设置options,attrs,socket的一些参数相关*/
//往pipeline中添加一个服务端channel对应handler,注意与childHandler的区别,这里为
//ChannelInitializer,ChannelInitializer是一个特殊的handler,在注册channel时,
//initChannel方法会被执行,并执行真正的添加handler的方法,完成使命后
//ChannelInitializer会被删掉
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
//这个handler 通常为null下面的if不会执行
if (handler != null) {
pipeline.addLast(handler);
}
//把添加handler的任务添加到这个channel对应的eventloop中的队列中,等待执行
//当然这时还不会执行,而是在注册channel的时候执行
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}上面方法中的ServerBootstrapAcceptor是属于ServerSocketChannel的handler,又netty自行添加,SocketChannel的handler由程序员添加,就像一开始写的那样:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new ServerHandler());
}
});
这里不难猜到,ServerBootstrapAcceptor这个handler的作用其实就是注册netty客户端连过来的channel,也就是把客户端channel注册到workerGroup中的某个线程的Selector上。
最后看config().group().register(channel)这个方法,这是netty注册ServerSocketChannel方法:
会拿到一个eventloop,服务端通常只有一个,然后执行register方法:
public ChannelFuture register(Channel channel) {
//把channel封装成Promise,可以理解为Future
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//这里是真正的注册方法
promise.channel().unsafe().register(this, promise);
return promise;
}
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//判断是否在当前eventLoop 线程内,这里还是启动线程,当然不在,走else
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//依然是把注册任务扔进eventLoop的队列中
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
这里说一下eventLoop中的单线程模型,每个eventLoop都会有一个线程在循环工作,这在后面netty工作主流程源码中会详细讲到,主要就是监听Selector,也就是epoll上的就绪队列(后面会讲到),如果客户端有事件到来就处理,我们把这些任务称作IO任务;除此之外还有非IO任务,也就是eventLoop队列中的任务,eventLoop终其一生都在不断的执行IO任务-->非IO任务-->IO任务-->...因此称作事件循环线程eventLoop。eventLoop.execute()不仅会把任务添加到等待队列中,还会开启当前eventLoop线程,开始循环工作:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//添加任务到队列
addTask(task);
if (!inEventLoop) {
//开启当前EventLoop线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}最后来到doStartThread()--->SingleThreadEventExecutor.this.run()开始eventLoop主流程
目前为止,netty启动源码差不多结束了,还差一个注册ServerSocketChannel没讲,下面看下注册逻辑,注册逻辑会在任务队列中等待执行:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}标记一下几个主要方法:
doRegister()
pipeline.invokeHandlerAddedIfNeeded()
pipeline.fireChannelRegistered();
首先看 doRegister():
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//熟悉nio的小伙伴应该不陌生,这里就是注册java原生channel的地方
//javaChannel()拿到ServerSocketChannel,注册到Selector上
//这里的最后一个参数为attchment,传进去的是this,也就是当前netty自己实现
//的Channel,通过k.attach(att),绑定到java的ServerSocketChannel
//通过这个实现ServerSocketChannel到NioServerSocketChannel数据的流通
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}至于后面的两个方法,其实都不会执行,这涉及到Handler生命周期方法的执行,有下面一众方法:
* handler的生命周期回调接口调用顺序: * handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete * -> channelInactive -> channelUnRegistered -> handlerRemoved * * handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调; * channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。 * channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。 * channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读; * channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕; * channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。 * channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程; * handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。
因为对应服务端的Handler是ServerBootstrapAcceptor,而它只有一个channelRead方法。对于客户端的Handler就要看自定义的实现了。对于ServerBootstrapAcceptor的channelRead方法,会在netty工作主流程源码中详细分析。
netty启动的核心源码到此结束,下一篇为netty架构的精髓:netty工作主流程源码。
ps:由于笔者水平有限,如有错误,请不吝指出!