一.IO与NIO
传送门:https://blog.csdn.net/qq_40772692/article/details/100302360
二.Netty框架简介
1.什么是Netty
Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
操作系统底层是支持异步I/O通信的,但是java传统的IO并没有提供异步IO通信的类库,导致java通信长时间都采用了同步阻塞模式(BIO),因此很多大型服务器大多都采用的c/c++开发,这才导致了java NIO的出现,但是NIO开发复杂,存在漏洞而且是同步非阻塞IO。简而言之:Netty相当于是对NIO的封装和优化而来的框架,是一个完全异步非阻塞IO框架, 通过主从Reactor模型来实现的。可以让我们忽略底层复杂的实现,更快速高效的开发出健壮稳定的通信网络应用。
2.Netty框架开发环境搭建
(1)官网下载netty4的最新版jar包:https://netty.io/
(2)新建普通Java 工程,起名为NettyTest,并新建文件夹lib用于导入jar包
(3)导入下载完毕的jar包,注意,这里下载的jar包分为两类,一类是为了方便学习和使用所提供的netty所有组件单独的jar包;另一类是包含所有组件的综合jar包:在all-in-one文件夹下的netty-all-4.1.39.Final.jar这个jar包,我们导入这个即可。

导入方法:将jar包复制到lib文件夹下,但是此时还不能使用。右击netty-all-4.1.39.Final.jar,点击Build Path->add to Build Path,之后会出现如下目录

环境部署完成,可以书写Netty程序了
三.Netty的核心组件
1.EventLoop和EventLoopGroup
EventLoop可以看作是一个Reactor线程,其内部封装了select,用来对注册事件进行轮询来分离事件,并且充当分发(Dispatcher)的角色,将事件分配给相应的一组handler。而EventLoopGroup是EventLoop的数组,即Reactor线程池,内部维护了一组EventLoop线程。
(1)服务器端:在 Netty 服务器端编程中我们需要 BossEventLoopGroup 和WorkerEventLoopGroup 两个 EventLoopGroup 来进行工作。BossEventLoopGroup 通常是一个单线程的 EventLoop,即里面只有一个EventLoop来处理连接,EventLoop 维护着一个注册了 ServerSocketChannel 的 Selector 实例,EventLoop 的实现涵盖 IO 事件的分离和分发(Dispatcher)
- BossEventLoopGroup 只负责处理连接,故开销非常小,通常只有一个EventLoop,连接到来,马上按照策略将 SocketChannel 转发给 WorkerEventLoopGroup。
- WorkerEventLoopGroup 会由 next 随机选择其中一个 EventLoop 来将这 个SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行专门处理。
(2)客户端:通常只有一个EventLoopGroup来发起连接处理IO操作
(3)小结
- NioEventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop;
- 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
- 所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
- 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
- 每一个 EventLoop 负责处理一个或多个 Channel;
(4)创建代码实示例
EventLoopGroup bossGroup = new NioEventLoopGroup();//接收请求交给worker
EventLoopGroup workerGroup = new NioEventLoopGroup();//处理事件2.ServerBootstrap和Bootstrap
ServerBootstrap是服务器端的辅助启动类,Bootstrap是客户端的辅助启动类。一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类,启动netty程序,配置连接参数等作用。
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerHandlerInitializer());
//服务器端添加线程组,NioServerSocketChannel通过反射机制创建类来监听端口,添加对每个连接channel的事件处理的handler处理器3.ChannelFuture与Listener监听器
(1)功能:Netty 为异步非阻塞的框架,即所有的 I/O 操作都为异步的即立即返回的,因此,我们不能立刻得知操作的结果和状态。因此Netty 提供了 ChannelFuture ,通过ChannelFuture的 addListener() 方法注册一个 ChannelFutureListener监听器,当操作执行成功或者失败时即操作完成时,监听器就会自动回调返回结果通知线程。
(2)状态:ChannelFuture有两种状态:completed和uncompleted。当一个IO操作开始时,会创建一个新的ChannelFuture,此时他处于一个uncompleted状态,当IO操作完成时,ChannelFuture就会被设置为completed,同时回调ChannelFutureListener中的operationComplete方法,此时他的完成状态有三种,如下图所示:

(3)示例代码
- ChannelFutureListener提供了一些封装好的Listener,比如ChannelFutureListener.CLOSE该监听器会在操作完成后关闭相应的channel通道:
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);//监听ctx的发送写入情况,写入完成后调用listener自动关闭通道- 自定义Listener:
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture arg0) throws Exception {
// TODO Auto-generated method stub
if(arg0.isSuccess()) {//successful
//do..
}else if(arg0.cause()!=null) {//fail
//do..
}else if(arg0.isCancelled()) {//cancel
//do..
}
}
});4.ChannelHandler与ChannelPipline
(1)ChannelHandler:Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。负责对到来的IO事件或IO操作选择性的进行拦截和处理,一个事件可以由多个handler处理。handler被分为两大类分别是入站inBoundHandler和出站outBoundHandler分别来处理入站和出站事件
(2)ChannelPipline:一个channel对应一个channelPipline(管道),channelPipline实际上是维护handler的一个双向链表,是handler的容器,一个channel可以有多个handler处理,对应的channelPipline和channelhandler之间有一个ChannelHandlerContext(上下文)类来影射和联系起来。关系图如下:

(3)注意事项
- ChannelInboundHandler之间的数据传递,需要通过调用 ctx.fireChannelRead(msg) 实现当前数据传递到下一个inboundhandler;
- 调用ctx.writeAndflush()会从当前handler位置向前寻找下一个outhandler;调用ctx.channel().writeAndflush()会从pipline尾部往前寻找outhandler输出
(4)消息的处理流程
- 消息到来时,由select()进行事件通知,触发底层的socketChannel.read()读取ByteBuf,触发ChannelRead事件,由IO线程EventLoop调用相应的ChannelPipline中的fireChannelRead()方法,将消息传输到ChannelPipline中
- 消息依次被handler1,handler2...拦截和处理,在这个过程中任何一个handler都可以中断消息的传递
- 消息处理完后调用write方法,消息经过多个outhandler处理添加到缓冲区BuyeBuf中,等待socketChannel.write()发送
(5)使用代码
/*ChannelHandler初始化器*/
public class MyServerHandlerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
pipeline.addLast(new MyServerHandlerNext());
}
}5.监听端口和关闭服务
(1)同步退出方式
我们常见的Netty Demo服务器端都是这么写的,其中sync()是同步方法
- bind(8899).sync();表示服务器端同步阻塞等待服务器端绑定监听端口成功再往下执行
- closeFuture().sync();表示添加一个连接关闭监听器,同步阻塞等待连接关闭后再执行优雅退出释放资源
Demo这么写是因为,开启服务器的是当前主线程,执行监听端口和处理IO的是子线程,而线程之间是独立的,主线程执行完毕就会结束。为了方便调试,使用sync()来阻塞主线程等待子线程结束,而不是提前结束。这样使用的话,客户端可以主动向服务器发送消息,但是服务器主线程由于阻塞无法执行别的操作。
ChannelFuture future = serverBootstrap.bind(8899).sync();//同步阻塞直到完成
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();//优雅关闭
workerGroup.shutdownGracefully();(2)异步退出方式
采用异步方式这样可以不阻塞主线程
//绑定端口 同步等待成功
ChannelFuture future=b.bind(port).sync();
//采用非同步方法退出netty 通过异步的方法不会被阻塞
future.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// TODO Auto-generated method stub
//释放资源退出
System.out.println(future.channel().toString()+" 链路关闭");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
6 ByteBuf解析
(1) Netty ByteBuf是对Java NIO的ByteBuffer的一个优化,ByteBuffer有很多缺点,比如:
- ByteBuffer长度固定,不能动态扩展,容易造成越界错误
- ByteBuffer只有一个指针,需要通过flip()变换读写,容易出错和混乱
- ByteBuffer API较少,功能不全
因此Netty优化封装了ByteBuf,ByteBuf提供双指针readIndex和writeIndex来标记读写位置,同时ByteBuf在写入数据之前会自动进行判断,若剩余空间不足则会进行动态拓展。极大地提高了开发的便捷性。其示意图如下所示:

- readerIndex,0到readerindex这里就是不读的数据,也就是抛弃的数据;从readerIndex开始读数据。
- writerIndex,readerIndex到writerIndex这里就是没有读的数据,也就是可读的内容;从writeIndex开始写数据。
- capacity,writerIndex到capacity这段数据就是我们可以往里面写的空间
- maxCapacity,其实这里应该还有个maxCapacity(可以看做是在capacity后面),capacity到maxCapacity这里,是这个Byte还可以扩展的空间。
(2)用法介绍(API:https://netty.io/4.1/api/)
- 其常用API如下所示:
(1)基础读写操作(支持随机访问)
- readXXX()和writeXXX()方法将会推进其对应的索引readerIndex和writerIndex。自动推进
- getXXX()和setXXX()方法用于访问数据,对writerIndex和readerIndex无影响
(2)slice,slice(int,int)切片操作
- 返回原始ByteBuf可读字节的一部分, 他们共享这部分内容,修改返回的缓冲区或此缓冲区的内容会影响彼此的内容,他们维护单独的index和makers,此方法不会修改原始缓冲区的readerIndex或writerIndex。
(3)ByteBuf初始化
Netty提供了一个简单的成为Unpooled的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。
buffer()
buffer(int initialCapacity)
buffer(int initialCapacity, int maxCapacity)返回一个未池化的基于堆内存存储的ByteBuf
wrappedBuffer() 返回了一个包装了给定数据的ByteBuf copiedBuffer() 返回了一个复制了给定数据的ByteBuf
7.简单服务器和客户端通信实例
(1)MyServer
package com.wx.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String []args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerHandlerInitializer());
ChannelFuture future = serverBootstrap.bind(8899).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
(2)MyServerHandlerInitializer初始化器
package com.wx.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyServerHandlerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
pipeline.addLast(new MyServerHandlerNext());
}
}
(3)MyServerHandler处理器1
package com.wx.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler1 is added.");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler1 make channel active.");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler1 make channel inactive.");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;//消息都是存入缓冲区中
byte[] reg = new byte[buf.readableBytes()];//开辟一个buf中可读字节数大小的byte数组
buf.readBytes(reg);//将buf中的readIndex-writeIndex内的可读内容读入byte数组
String body = new String(reg, "UTF-8");//解码为字符串
System.out.println("The server receive order : " + body);
String respMsg = body.toUpperCase();
ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());//创建缓冲区
//ctx.writeAndFlush(buf);
ctx.fireChannelRead(respByteBuf);//发送给下一个InboundHandler处理
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler1 make channel registered.");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler1 make channel unregistered.");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.toString());
ctx.close();
}
}
(4)MyServerHandlerNext处理器2
package com.wx.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandlerNext extends ChannelInboundHandlerAdapter{
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler2 is added.");
}//handler被添加到通道时调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler2 make channel active.");
}//handler激活时调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler2 make channel inactive.");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//有消息传来时调用,ctx是一个联系handler与相应pipline的组件上下文
ByteBuf buf = (ByteBuf) msg;
byte[] reg = new byte[buf.readableBytes()];
buf.readBytes(reg);
String body = new String(reg, "UTF-8");
System.out.println("The server sended : " + body);
ByteBuf respByteBuf = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(respByteBuf);//发送回客户端
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler2 make channel registered.");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler2 make channel unregistered.");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
(5)MyClient客户端
package com.wx.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyClintInitializer());
ChannelFuture future = bootstrap.connect("127.0.0.1",8899).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
(6)客户端初始化器MyClintInitializer
package com.wx.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClintInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
(7)客户端MyClientHandler处理器
package com.wx.netty;
import java.util.Scanner;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// String mString = "a small character";
Scanner scanner = new Scanner(System.in);
String mString = scanner.nextLine();
ByteBuf respByteBuf = Unpooled.copiedBuffer(mString.getBytes());
ctx.writeAndFlush(respByteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client read");
ByteBuf buf = (ByteBuf) msg;
byte[] reg = new byte[buf.readableBytes()];
buf.readBytes(reg);
String body = new String(reg, "UTF-8");
System.out.println("Client received message: " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
}
}
(8)运行结果
