一般业务需求都会自行定义私有协议来满足自己的业务场景,私有协议也可以解决粘包和拆包问题,比如客户端发送数据时携带数据包长度,服务端接收数据后解析消息体,获取数据包长度值,据此继续获取数据包内容。我们来看具体例子,自定义的协议如下:
+--------------------------------------------------+----------+
| 消息头 | 消息体 |
| Delimiter | Length | Type | Reserved | data |
+-------------------------------------------------+----------+
1) Delimiter:4bytes,消息头,用于分割消息。
2) Length:数据长度。
3) Type:1bytes,消息类型。
4) Reserved:1bytes,保留。
5)Data包数据
接下来看如何实现编解码:
1、先定义好javabean:
总体的:
package com.wlf.netty.nettyapi.javabean;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getterpublic classNettyMessage {privateHeader header;private byte[] data;
@OverridepublicString toString() {return "NettyMessage{" +
"header=" + header +
", data=" + data +
'}';
}
}
头的:
package com.wlf.netty.nettyapi.javabean;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setterpublic classHeader {/**
* 4bytes,消息头,用于分割消息。如0xABEF0101*/
private intdelimiter;/**
* 1byte,类型*/
private bytetype;/**
* 1byte,保留*/
private bytereserved;/**
* 数据长度*/
private intlength;
@OverridepublicString toString() {return "Header{" +
"delimiter=" + delimiter +
", length=" + length +
", type=" + type +
", reserved=" + reserved +
'}';
}
}
2、编码:
packagecom.wlf.netty.nettyapi.msgpack;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty.handler.codec.MessageToByteEncoder;public class NettyMessageEncoder extends MessageToByteEncoder{
@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, NettyMessage nettyMessage, ByteBuf byteBuf) throwsException {if (nettyMessage == null || nettyMessage.getHeader() == null) {throw new Exception("The nettyMessage is null.");
}//1、写入分割标志
byteBuf.writeInt(nettyMessage.getHeader().getDelimiter());//2、写入数据包长度
byteBuf.writeInt(nettyMessage.getData() != null ? nettyMessage.getData().length : 0);//3、写入请求类型
byteBuf.writeByte(nettyMessage.getHeader().getType());//4、写入预留字段
byteBuf.writeByte(nettyMessage.getHeader().getReserved());//5、写入数据
byteBuf.writeBytes(nettyMessage.getData() != null ? nettyMessage.getData() : null);
}
}
3、解码:
packagecom.wlf.netty.nettyapi.msgpack;importcom.wlf.netty.nettyapi.constant.Delimiter;importcom.wlf.netty.nettyapi.javabean.Header;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty.handler.codec.ByteToMessageDecoder;importjava.util.List;public class NettyMessageDecoder extends ByteToMessageDecoder{/*** 消息体字节大小:分割符字段4字节+长度字段4字节+请求类型字段1字节+预留字段1字节=10字节*/
private static final int HEAD_LENGTH = 10;
@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throwsException {//字节流开始位置
intpackStartIndex;while (true) {//获取字节流开始位置
packStartIndex =byteBuf.readerIndex();//若读取到分割标识,说明读取当前字节流开始位置了
if (byteBuf.readInt() ==Delimiter.DELIMITER) {break;
}//重置读索引为0
byteBuf.resetReaderIndex();//长度校验,字节流长度至少10字节,小于10字节则等待下一次字节流过来
if (byteBuf.readableBytes()
}
}//2、获取data的字节流长度
int dataLength =byteBuf.readInt();//校验数据包是否全部发送过来,总字节流长度(此处读取的是除去delimiter和length之后的总长度)减去type和reserved两个字节=data的字节流长度
int totalLength =byteBuf.readableBytes();if ((totalLength - 2)
byteBuf.readerIndex(packStartIndex);return;
}//3、请求类型
byte type =byteBuf.readByte();//4、预留字段
byte reserved =byteBuf.readByte();//5、数据包内容
byte[] data = null;if (dataLength > 0) {
data= new byte[dataLength];
byteBuf.readBytes(data);
}
NettyMessage nettyMessage= newNettyMessage();
Header header= newHeader();
header.setDelimiter(0xABEF0101);
header.setLength(dataLength);
header.setType(type);
header.setReserved(reserved);
nettyMessage.setHeader(header);
nettyMessage.setData(data);
list.add(nettyMessage);//回收已读字节
byteBuf.discardReadBytes();
}
}
为了运行,我们需要写客户端和服务端的handler:
4、客户端handler:
packagecom.wlf.netty.nettyclient.handler;importcom.wlf.netty.nettyapi.constant.Delimiter;importcom.wlf.netty.nettyapi.constant.MessageType;importcom.wlf.netty.nettyapi.javabean.Header;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importcom.wlf.netty.nettyapi.util.CommonUtil;importio.netty.channel.ChannelHandlerAdapter;importio.netty.channel.ChannelHandlerContext;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.ArrayUtils;importjava.io.RandomAccessFile;importjava.util.Arrays;/*** 客户端处理类*/@Slf4jpublic class NettyClientHandler extendsChannelHandlerAdapter {
@Overridepublic void channelActive(ChannelHandlerContext ctx) throwsException {
ctx.writeAndFlush(buildClientRequest());
}/*** 创建请求消息体
*
*@return
*/
privateNettyMessage buildClientRequest() {
NettyMessage nettyMessage= newNettyMessage();
Header header= newHeader();byte[] data = new byte[0];try{
data=buildPcmData();
}catch(Exception e) {
e.printStackTrace();
}
header.setDelimiter(0xABEF0101);
header.setLength(data.length);
header.setType((byte) 1);
header.setReserved((byte) 0);
nettyMessage.setHeader(header);//设置数据包
nettyMessage.setData(data);returnnettyMessage;
}/*** 构造PCM请求消息体
*
*@return
*/
private byte[] buildPcmData() throwsException {byte[] resultByte =longToBytes(System.currentTimeMillis());returnresultByte;
}/*** long转字节
*
*@paramvalues
*@return
*/
private byte[] longToBytes(longvalues) {byte[] buffer = new byte[8];for (int i = 0; i < 8; i++) {int offset = 64 - (i + 1) * 8;
buffer[i]= (byte) ((values >> offset) & 0xff);
}returnbuffer;
}/*** 将两个数组合并起来
*
*@paramarray1
*@paramarray2
*@return
*/
private byte[] addAll(byte[] array1, byte... array2) {byte[] joinedArray = new byte[array1.length +array2.length];
System.arraycopy(array1,0, joinedArray, 0, array1.length);
System.arraycopy(array2,0, joinedArray, array1.length, array2.length);returnjoinedArray;
}/*** 在处理过程中引发异常时被调用
*
*@paramctx
*@paramcause
*@throwsException*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
log.error("[Client] netty client request error: {}", cause.getMessage());
ctx.close();
}
}
5、服务端handler:
packagecom.wlf.netty.nettyserver.handler;importcom.alibaba.fastjson.JSON;importcom.wlf.netty.nettyapi.constant.MessageType;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerAdapter;importio.netty.channel.ChannelHandlerContext;importlombok.extern.slf4j.Slf4j;
@Slf4jpublic class NettyServerHandler extendsChannelHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
NettyMessage nettyMessage=(NettyMessage) msg;if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 1) {
log.info("[server] server receive client message : {}", nettyMessage);if (nettyMessage == null || nettyMessage.getData() == null) {
log.error("nettyMessage is null.");
}//获取时间戳(8字节)
byte[] data =nettyMessage.getData();
ByteBuf buf=Unpooled.buffer(data.length);
buf.writeBytes(data);long startTime =buf.readLong();
log.info("data length: {}", data.length);
log.info("startTime: {}", startTime);
}
}
@Overridepublic voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("server received failed, error : {}", cause.getMessage());
cause.printStackTrace();
ctx.close();
}
}
最后,为了启动,我们还得再补两个启动类:
6、客户端:
packagecom.wlf.netty.nettyclient.client;importcom.wlf.netty.nettyapi.msgpack.NettyMessageDecoder;importcom.wlf.netty.nettyapi.msgpack.NettyMessageEncoder;importcom.wlf.netty.nettyclient.handler.NettyClientHandler;importio.netty.bootstrap.Bootstrap;import io.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;importlombok.extern.slf4j.Slf4j;importjava.net.InetSocketAddress;/*** 客户端
* 1.为初始化客户端,创建一个Bootstrap实例
* 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;
* 3.当连接被建立时,一个NettyClientHandler实例会被安装到(该Channel的一个ChannelPipeline中;
* 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。*/@Slf4jpublic classNettyClient {private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
EventLoopGroup group= newNioEventLoopGroup();public void connect(int port, String host) throwsException {
NioEventLoopGroup workGroup= newNioEventLoopGroup();try{
Bootstrap bootstrap= newBootstrap();
bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Overridepublic void initChannel(SocketChannel channel) throwsException {
channel.pipeline().addLast(newNettyMessageDecoder());
channel.pipeline().addLast(newNettyMessageEncoder());
channel.pipeline().addLast(newNettyClientHandler());
}
});
ChannelFuture future=bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
}finally{
workGroup.shutdownGracefully();
}
}public static void main(String[] args) throwsException {int port = 9911;new NettyClient().connect(port, "127.0.0.1");
}
}
7、服务端启动类:
packagecom.wlf.netty.nettyserver.server;importcom.wlf.netty.nettyapi.msgpack.NettyMessageDecoder;importcom.wlf.netty.nettyapi.msgpack.NettyMessageEncoder;importcom.wlf.netty.nettyserver.handler.NettyServerHandler;importio.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.logging.LogLevel;importio.netty.handler.logging.LoggingHandler;importlombok.extern.slf4j.Slf4j;
@Slf4jpublic classNettyServer {private final EventLoopGroup bossGroup = newNioEventLoopGroup();private final EventLoopGroup workGroup = newNioEventLoopGroup();public void bind(int port) throwsException{try{
ServerBootstrap serverBootstrap= newServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(newLoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Overridepublic void initChannel(SocketChannel channel) throwsException {
channel.pipeline().addLast(newNettyMessageDecoder());
channel.pipeline().addLast(newNettyMessageEncoder());
channel.pipeline().addLast(newNettyServerHandler());
}
});//绑定端口
ChannelFuture channelFuture =serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}public static void main(String[] args) throwsException {int port = 9911;newNettyServer().bind(port);
}
}
直接跑上面两个启动类,先跑服务端,再跑客户端:
客户端输出:
17:20:04.258 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] client send data : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[B@432f82d5}
服务端输出:
17:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[B@16eddbb3}
17:20:04.295 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - data length: 817:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1570785604258
以上解码在字节流总容量小于1024时都没问题,但超过就会出现服务端获取不到数据的问题,详情和解决办法请参见netty5拆包问题解决实例。