最近学习了使用netty进行网络通信,对数据帧进行封装打包等,记录一下遇到的问题。网上很官方的描述就不再描述啦。其次,这个笔记是根据近期实现的项目而做的,因此比较偏向于与实物之间的通信协议问题。
一、Netty是什么
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架。
二、Netty作为服务器端的整体架构
出去一个监听器去调用服务之外,主要分为以下4个部分:
①NettyServer (or Client )进行服务的启动,在这里可以定义自己是服务端还是客户端,同时定义channel通道进行使用;
②Decoder 使用①中的channel通道拿到收取的报文,对接受到的数据帧进行解码拿到自己想要的信息;
③Encoder 同②所述,将想要发送给远程客户端的消息体封装成报文并放入channel通道进行发送;
④Handler 这里主要进行一些逻辑处理,例如通道刚刚激活后,要进行什么事务以及读通道中的消息;
1、 NettyServer
首先,来一段作为服务器端的代码(由于是新手,几乎每一句都去搜了是什么意思,写在了注释里,方便以后复习查看):
一些参数查看了这个博客:链接: link.
//NettyServer.java
private static ServerBootstrap serverBootstrap;
public static void initNetty(String ip, Integer port,Integer IDLETIME_READER,Integer IDLETIME_WRITER,Integer IDLETIME_ALL) throws InterruptedException {
//创建一个ServerBootstrap实例,是Netty的启动辅助类,提供了一些列的方法用于设置服务器的参数
serverBootstrap = new ServerBootstrap();
//设置线程池
EventLoopGroup boss = new NioEventLoopGroup();//用于服务器端接收客户端的连接
EventLoopGroup worker = new NioEventLoopGroup();//用于处理网络事件
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)//设置绑定服务器端的channel(NioServerSocketChannel)
.option(ChannelOption.SO_BACKLOG, 2048)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50
.option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(512, 1024, 65536))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//以上三行设置接收缓冲区,这里感觉比较迷糊看不懂,以后继续看
.childOption(ChannelOption.SO_KEEPALIVE, true)//是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活
.childOption(ChannelOption.TCP_NODELAY, true)//用于启用或关于Nagle算法(尽可能发送大块数据,避免网络中充斥着许多小数据块)。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false
.childHandler(new ChannelInitializer<SocketChannel>() {//定义ChannelInitializer的匿名类对象,并作为childHandler方法的参数,将其赋值给childHandler
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("idleStateHandler", new IdleStateHandler(IDLETIME_READER, IDLETIME_WRITER, IDLETIME_ALL, TimeUnit.SECONDS))//服务端添加IdleStateHandler心跳检测处理器,READER是读超时时间,WRITER是写超时时间,ALL是写+读总共的(好像是),最后是前面三个时间的单位是SECONDS(秒)【我就因为没有修改单位傻傻的盯着表等了好久也没有断开连接55】一旦超时触发这个事件,就会进入到Handler的userEventTriggered中,我们可以自己定义相关处理
.addLast("decoder", new ServerDecoder())
.addLast("channelHandler", new ServerHandler())
.addLast("encoder", new ServerEncoder());//以上三个就是将你要进行的操作放到通道里,分别对应着下面要说到的三个文件,这里要注意顺序,例如作为服务器端,自然是需要先收到客户端传来的报文进入decoder进行解码,然后对解出来的数据进入Handler进行逻辑处理,处理后将想要返回给客服端的报文通过encoder加码发送出去
logger.info("success to initHandler!");//打印日志
}
});
try {
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();//Future模式的channel对象,让线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver(为了让netty不会关闭)
logger.info("[Netty] - Server bootstrap bind to addr(" + ip + ":" + port + ")");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("Server start got exception!", e.getMessage());
} finally {
//关闭服务器
boss.shutdownGracefully();
worker.shutdownGracefully();
}
2、Handler
后来,看了一些博客发现,Handler分为入站和出站,进行对比后发现这个项目中仅仅使用了入站Handler,例如下图,可以定义以下一些方法:
- channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
- channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
- channelActive,ChannelHandlerContext的Channel已激活
- channelInactive,ChannelHanderContxt的Channel结束生命周期
- channelRead,从当前Channel的对端读取消息
- channelReadComplete,消息读取完成后执行
- userEventTriggered,一个用户事件被处罚
- channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
- exceptionCaught,重写父类ChannelHandler的方法,处理异常
//ServerHandler.java
public class ServerHandler extends ChannelInboundHandlerAdapter {
//ChannelPipeline中维护这一个由ChannelHandlerContext组成的双向链表,每个ChannelHandlerContext关联着一个ChannelHandler,具体如上方第一张图所示,处理入站事件,以及用户自定义事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
logger.info("into ServerHandler channelActive()........");
}
@Override
//代码中使用writeAndFlush()写入通道的信息,将会在这里读取
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx,msg);
logger.info("received message from client :");
}
//异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
//当NettyServer的IdleStateHandler的心跳报活超时触发后,执行以下逻辑
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress();
logger.info("链路检测超时,关闭链接:"+((IdleStateEvent) evt).state().toString() + " - " + addr.getAddress().getHostAddress() + ":" + addr.getPort());
ctx.channel().close();//关闭链接
}
}
3、Decoder
//ServerDecoder.java
public class ServerDecoder extends ByteToMessageDecoder {
//传输信息为不定长的16进制数据,由消息头+消息体+校验位组成
//对这个的理解,仅仅到了channel通道中的参数会写入byteBuf这样一个容器中,这个容器有一个读指针和一个写指针作为索引来使用数据
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
logger.info("into ServerDecoder decode()......");
byte[] msg = ByteBufUtil.getBytes(byteBuf);//将byteBuf转换为byte数组进行读写
System.out.println("Length: " + msg.length + " - msg: " + ByteUtil.bytesToHexString(msg, 0, msg.length));
byte[] fullMessage = null;
int bufLength = byteBuf.readableBytes();//数据的总长度
if (bufLength < 8) {//这里因为项目中消息头+crc校验码占8字节,因此一条报文的最短长度为8字节,一旦小于8字节,就执行return返回继续等待后续报文的传送
return;
}
// 标记游标位置
byteBuf.markReaderIndex();
// 解析起始标志,消息头中的起始标志占一个字节
byte[] head = new byte[1];
byteBuf.readBytes(head);//在byteBuf中拿出一个字节的数据,放到head这个byte[]数组中
String startIdent = ByteUtil.bytesToHexString(head, 0, 1);//将byte数组转为字符串格式,因为该变量具体定义时为字符串
assert startIdent != null;
if (startIdent.equals("xx")) {//项目的起始标识为xx,只有报文的第一个字节为xx才是该受到的报文,因此才开始解析
//这里也可以去解析其他需要的数据加以使用,仅举一个例子,去解析一个int类型的消息体长度字段
// 解析数据体长度
byte[] len_bs = new byte[2];
byteBuf.readBytes(len_bs);
int len = ByteBigUtil.getInt(len_bs);
// 计算帧长度,消息体长度+消息头+校验位
int fullLength = (len + 8);
if (bufLength < fullLength) {
// 长度不足,下一次
byteBuf.resetReaderIndex();//将游标重置,返回继续读取数据
return;
}
// 读取帧结构
byteBuf.resetReaderIndex();//为读取全部数据,重置游标
fullMessage = new byte[fullLength];
byteBuf.readBytes(fullMessage);//将全部数据存储至fullMessage中
} else {//若起始标识不正确,重置游标并返回
byteBuf.resetReaderIndex();
byteBuf.readByte();
return;
}
if (fullMessage != null) {
String info = ByteUtil.bytesToHexString(fullMessage, 0, fullMessage.length);
//获取ip地址和prot端口号
String ip = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress();
Integer port = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getPort();
logger.info("Received... (" + ip + ":" + port + ") data: " + info);
//这里使用反射去降低代码的耦合度,使用报文中的命令码跳转到相应的对象的构造方法中,进行初始化并执行业务逻辑;StaticUtil.protobuf_package定义了从com.至跳转对象的具体路径
Class<?> clazz = Class.forName(StaticUtil.protobuf_package + ByteUtil.bytesToHexString(fullMessage, 1, 2));//根据protobuf包名+命令码 反射
Constructor c = clazz.getConstructor(byte[].class, String.class, Integer.class,Channel.class);
Channel channel = channelHandlerContext.channel();
Object o = c.newInstance(fullMessage, ip, port,channel);//进入相应的构造方法
list.add(o);
}
}
}
4、Encoder
//ServerEncoder.java
public class ServerEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
String ip = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress();
Integer port = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getPort();
//o为刚刚decoder业务逻辑处理后要返回给客户端的对象
MessageBody msgBody = (MessageBody) o;//MessageBody为一个各个消息体的父类对象
Method method_writeToBytes = MessageBody.class.getMethod("xxxxwrite");//通过反射获取对象中一个名为xxxxwrite的方法,这个方法的主要功能是将想要返回的报文写入一个byte数组中
byte[] sendBs = (byte[]) method_writeToBytes.invoke(msgBody);//调用上述方法,返回该方法返回的一个byte数组,该数组即为要发送的全部报文信息
byteBuf.writeBytes(sendBs);
channelHandlerContext.writeAndFlush(byteBuf);//写入通道,发送
byteBuf.retain();//中途出现生命周期问题,使用该语句保留索引
System.out.println("encode ended......");
}
}
二、网络调试助手的使用
关于网络调试助手的下载可以去搜索一些其他的博客,我来说一下因为第一次使用网络调试助手遇到的问题,比较小白:
1、首先确定你自己写的是服务器端还是客户端,去调整这个助手,如果你的代码是实现服务器端,就调整为TCP Client;相反,客户端的话就调整为TCP Server。
2、下边的ip和端口号,根据NettyServer传入的ip和port参数进行调整,按理说应该在监听器中去写,但是这里单拿出一个作为测试连接的例子,可以直接写在NettyServer中:
public static void main(String[] args)throws Exception {
initNetty("127.0.0.1",8081,50,0,0);
}
参照以上的例子,就将网络调试助手设置成一下样子:
然后启动你的服务器,再点击连接就可以啦
3、明确传输报文的格式,明确是ASCII码还是16进制传输,这里我就遇到了很大的坑,当时解码怎么都解不出来,好不容易解出来只能一个字节数组存一位数字,后来才发现没有设置传输的形式,勾选的ASCII码,废了很大的劲完全写完整个项目后才发现这个助手可以调传输数据的方式,于是又废了一晚上大改整个项目…
这里还有一些其他的功能可以自己去了解一下。我也是一个在校大学生小白,因此有很多地方可能说的并不准确而且有问题,希望大家帮忙指正!