文章目录
一、ServerBootstrap 启动过程
1. 创建线程模型
该步骤也就是创建 Netty 主从 Reactor 线程模型对应的线程池 NioEventLoopGroup,一个为主 Reactor ,一般设置为一个线程,处理连接请求,一个为从 Reactor 根据业务需求设置线程数,处理 IO任务并发回给客户端,具体初识话源码可以看我另一篇博客,线程池的初始化包括了初始化线程,开启 Selector并注册到线程上,优化底层数据结构等待。
2. doBind() 绑定端口并做一些初始化
该步基本是 Netty 服务器端启动的最后一步,绑定端口并做一些初始化操作,大致源码如下所示,主要包括两个方法:
- initAndRegister():创建并绑定 channel 到对应的 Selector 上;
- doBind0():绑定端口并且设置监听连接事件;
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化 channel 并注册到对应的 NioEventLoop 线程上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 判断注册是否发生异常
if (regFuture.cause() != null) {
return regFuture;
}
// 如果注册完成
if (regFuture.isDone()) {
// 创建一个异步任务去绑定端口
ChannelPromise promise = channel.newPromise();
// 绑定端口以及之后的一些操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 因为注册 channl 通过 future 的异步任务来实现所以没有成功要即时传递给主线程
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
// 添加一个注册的监听器,监听注册任务
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
// 如果失败响应主线程
if (cause != null) {
promise.setFailure(cause);
} else {
// 成功则进入绑定逻辑
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
2.1 initAndRegister()
通过反射机制创建一个 channel,使用反射是为了可以提高扩展性,创建适合的 channel 。创建完成后赋值给 AbstractNioChannel 的 ch 属性,将 Netty 的 Channel 和 NIO 的channel 整合。然后进行初始化,设置上下文和一些参数,成功之后就可以进行注册,调用 AbtractNioChannel 中 unsafe 的 doRegister 进行注册,注册到对应的 Selector 选择器上,返回一个 SelectionKey,来保存各种参数,例如,感兴趣的事件集合,准备就绪的事件集合,对应的channel Selector 等。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
* 通过反射创建一个 channel 对象,目的是为了可以为开发者可以定制自己希望使用的 channel
* constructor.newInstance();
* 并赋值给 AbstractNioChaneel 的 SelectableChannel ch; 属性
*/
channel = channelFactory.newChannel();
/**
* 初始化创建的 channel
* 包括设置 options 参数信息,例如使用长连接,禁用 Nagle 等
* 还有放置 attr 上下文信息
*/
init(channel);
} catch (Throwable t) {
// 如果初始化失败,创建一个异步任务返回异常
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 将创建的 Channel 注册到对应线程的选择器上,
* 最终会经过层层封装调用 AbstractNioChannel 中的 unsafe 类的 doRegister 方法
* 将 channel 注册到选择器上并返回一个 selectionKey
*/
ChannelFuture regFuture = config().group().register(channel);
// 注册异常的处理
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
/**
* 如果你自己看源码,会发现这里有一段很长的注释,大致意思是:
* (1)在注册之后可以安全的调用 bind() 和 connect() 进行绑定和连接
* (2)这里会涉及线程的切换,ServerBootStrap 运行在主线程,
* 而 register() 运行在 EventLoop 线程
*/
return regFuture;
}
2.2 doBind0()
会调用 dobind0 绑定对应的端口号和主机。绑定成功之后就会注册感兴趣的OP_ACCEPT 连接事件,接受客户端发起的连接请求。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 通过eventLoop 线程去执行绑定 localAddress 的任务
// 并添加一个监听器监听错误关闭
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
// 触发 active 事件,也就是监听 OP_ACCEPT 连接事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
二、Netty 对于 IO 事件的处理
1. OP_ACCEPT 连接事件
Netty 服务端对于连接请求的处理主要分为以下三步:
- 当 NioEventLoop 的 Selector 轮询到有就绪的 IO 事件时,会检查类型是否是 OP_ACCEPT ,如果是,会取出对应的 attachment 附件,也就是对应的 NioServerSocketChannel 本身,调用它的内部辅助类 unsafe 来进行一个数据的读取操作‘’
- 读取到数据之后,会调用 ServerSocketChannel 的 accept() 方法建立连接并且创建一个 NioSocketChannel 对象传送到 pipline 中进行处理。
- 在 pipline 中其中有一个 handler 就是 ServerBootstrapAcceptor,该 Acceptor 会将传递来的 NioSocketChannel 注册到从 Reactor 线程池中的某一个 EventLoop 的Selector 选择器上,并且为添加感兴趣事件以及其绑定 handler 执行链。
2. OP_READ 读取事件
对于读取操作的流程,大致分为两步:
- 选择器轮询到READ操作的 SelectionKey ,取出对应的附件,利用辅助类 unsafe 的 read 操作,循环读入数据到 ByteBuf 中;
- 然后放入到 pipline 中通过 handler 进行处理,经过解码和一些列 handler 的处理,得到对应的可读取的 ByteBuf ;
对于第一步的循环读取,在另一篇博客里已经详细说明了,这一次就来说在 pipline 的解码操作,用户选择的解码器,除了使用 MessageToMessageCodec 的子类外,都会经过 ByteToMessageDecoder 抽象类的 channelRead() 方法,以下就来解读一番。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 创建一个解码后的消息列表
CodecOutputList out = CodecOutputList.newInstance();
try {
// 将传递的数据转型为 ByteBuf
ByteBuf data = (ByteBuf) msg;
// 判断是否是第一次解码操作
first = cumulation == null;
if (first) {
// 是的话,将数据直接赋值给堆积容器
cumulation = data;
} else {
// 如果不是,则需要将数据写入堆积容器
// 需要维护读写指针,判断扩容等
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
/**
* 具体的解码操作,循环对可读数据进行解码,
* 期间会有一些合法性判断,如果不合法则停止操作,
* 主要的解码操作在:
* decodeRemovalReentryProtection(ctx, in, out); 方法中的
* decode(ctx, in, out); 方法,由子类重写来进行解码操作
*/
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {// 解码完的收尾操作
// 如果字节堆积容器不为空,并且不可读,则需要释放资源
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
// 如果读取的次数,大于配置的次数,则需要继续读取
} else if (++ numReads >= discardAfterReads) {
// 设置读取次数为 0
numReads = 0;
// 清除掉一些读取过的字节,
// 防止一直读入造成 OOM
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
// pipline 放行到下一个 handler
fireChannelRead(ctx, out, size);
// 回收消息集合
out.recycle();
}
} else {
// 如果不是 ByteBuf 类型,则直接放行
ctx.fireChannelRead(msg);
}
}
3. OP_READ 读取事件(二)
既然读取到了对应的数据,一般情况要对于请求进行响应,写出客户端需要结果,这一节就说明读取数据之后,如何写回数据给客户端。
Netty 在进行数据的写操作时,为不阻塞运行,会包装为一个 writeTask 或者 writeAndFlushTask 放入到任务队列中,由线程去进行完成。
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 一系列合法性判断
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 获取对应的输出 handler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
// 得到对应处理线程
EventExecutor executor = next.executor();
// 如果线程是 EventLoop 线程,则执行写操作,通过 flush 标志判断写完是否刷新
if (executor.inEventLoop()) {
/**
* 对于真正的写数据操作
* 就是先经过 encode 编码得到编码后的字节数据
* 然后写入到 ByteBuf 字节容器中
* 最后会调用到 NioSocketChannel 的 doWrite() 方法进行写入与刷新
*/
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 如果不是 EventLoop 线程
// 则封装为一个写操作任务
final AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
// 将该任务加入到线程组的任务队列中
if (!safeExecute(executor, task, promise, m, !flush)) {
// 如果加入失败,则取消该任务,类似拒绝策略
task.cancel();
}
}
}