java netty5_netty5自定义私有协议实例

一般业务需求都会自行定义私有协议来满足自己的业务场景,私有协议也可以解决粘包和拆包问题,比如客户端发送数据时携带数据包长度,服务端接收数据后解析消息体,获取数据包长度值,据此继续获取数据包内容。我们来看具体例子,自定义的协议如下:

+--------------------------------------------------+----------+

|                消息头                    |     消息体 |

| Delimiter | Length | Type | Reserved      |       data   |

+-------------------------------------------------+----------+

1)      Delimiter:4bytes,消息头,用于分割消息。

2)      Length:数据长度。

3)      Type:1bytes,消息类型。

4)      Reserved:1bytes,保留。

5)Data包数据

接下来看如何实现编解码:

1、先定义好javabean:

总体的:

package com.wlf.netty.nettyapi.javabean;

import lombok.Getter;

import lombok.Setter;

@Setter

@Getterpublic classNettyMessage {privateHeader header;private byte[] data;

@OverridepublicString toString() {return "NettyMessage{" +

"header=" + header +

", data=" + data +

'}';

}

}

头的:

package com.wlf.netty.nettyapi.javabean;

import lombok.Getter;

import lombok.Setter;

@Getter

@Setterpublic classHeader {/**

* 4bytes,消息头,用于分割消息。如0xABEF0101*/

private intdelimiter;/**

* 1byte,类型*/

private bytetype;/**

* 1byte,保留*/

private bytereserved;/**

* 数据长度*/

private intlength;

@OverridepublicString toString() {return "Header{" +

"delimiter=" + delimiter +

", length=" + length +

", type=" + type +

", reserved=" + reserved +

'}';

}

}

2、编码:

packagecom.wlf.netty.nettyapi.msgpack;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty.handler.codec.MessageToByteEncoder;public class NettyMessageEncoder extends MessageToByteEncoder{

@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, NettyMessage nettyMessage, ByteBuf byteBuf) throwsException {if (nettyMessage == null || nettyMessage.getHeader() == null) {throw new Exception("The nettyMessage is null.");

}//1、写入分割标志

byteBuf.writeInt(nettyMessage.getHeader().getDelimiter());//2、写入数据包长度

byteBuf.writeInt(nettyMessage.getData() != null ? nettyMessage.getData().length : 0);//3、写入请求类型

byteBuf.writeByte(nettyMessage.getHeader().getType());//4、写入预留字段

byteBuf.writeByte(nettyMessage.getHeader().getReserved());//5、写入数据

byteBuf.writeBytes(nettyMessage.getData() != null ? nettyMessage.getData() : null);

}

}

3、解码:

packagecom.wlf.netty.nettyapi.msgpack;importcom.wlf.netty.nettyapi.constant.Delimiter;importcom.wlf.netty.nettyapi.javabean.Header;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty.handler.codec.ByteToMessageDecoder;importjava.util.List;public class NettyMessageDecoder extends ByteToMessageDecoder{/*** 消息体字节大小:分割符字段4字节+长度字段4字节+请求类型字段1字节+预留字段1字节=10字节*/

private static final int HEAD_LENGTH = 10;

@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throwsException {//字节流开始位置

intpackStartIndex;while (true) {//获取字节流开始位置

packStartIndex =byteBuf.readerIndex();//若读取到分割标识,说明读取当前字节流开始位置了

if (byteBuf.readInt() ==Delimiter.DELIMITER) {break;

}//重置读索引为0

byteBuf.resetReaderIndex();//长度校验,字节流长度至少10字节,小于10字节则等待下一次字节流过来

if (byteBuf.readableBytes()

}

}//2、获取data的字节流长度

int dataLength =byteBuf.readInt();//校验数据包是否全部发送过来,总字节流长度(此处读取的是除去delimiter和length之后的总长度)减去type和reserved两个字节=data的字节流长度

int totalLength =byteBuf.readableBytes();if ((totalLength - 2)

byteBuf.readerIndex(packStartIndex);return;

}//3、请求类型

byte type =byteBuf.readByte();//4、预留字段

byte reserved =byteBuf.readByte();//5、数据包内容

byte[] data = null;if (dataLength > 0) {

data= new byte[dataLength];

byteBuf.readBytes(data);

}

NettyMessage nettyMessage= newNettyMessage();

Header header= newHeader();

header.setDelimiter(0xABEF0101);

header.setLength(dataLength);

header.setType(type);

header.setReserved(reserved);

nettyMessage.setHeader(header);

nettyMessage.setData(data);

list.add(nettyMessage);//回收已读字节

byteBuf.discardReadBytes();

}

}

为了运行,我们需要写客户端和服务端的handler:

4、客户端handler:

packagecom.wlf.netty.nettyclient.handler;importcom.wlf.netty.nettyapi.constant.Delimiter;importcom.wlf.netty.nettyapi.constant.MessageType;importcom.wlf.netty.nettyapi.javabean.Header;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importcom.wlf.netty.nettyapi.util.CommonUtil;importio.netty.channel.ChannelHandlerAdapter;importio.netty.channel.ChannelHandlerContext;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.ArrayUtils;importjava.io.RandomAccessFile;importjava.util.Arrays;/*** 客户端处理类*/@Slf4jpublic class NettyClientHandler extendsChannelHandlerAdapter {

@Overridepublic void channelActive(ChannelHandlerContext ctx) throwsException {

ctx.writeAndFlush(buildClientRequest());

}/*** 创建请求消息体

*

*@return

*/

privateNettyMessage buildClientRequest() {

NettyMessage nettyMessage= newNettyMessage();

Header header= newHeader();byte[] data = new byte[0];try{

data=buildPcmData();

}catch(Exception e) {

e.printStackTrace();

}

header.setDelimiter(0xABEF0101);

header.setLength(data.length);

header.setType((byte) 1);

header.setReserved((byte) 0);

nettyMessage.setHeader(header);//设置数据包

nettyMessage.setData(data);returnnettyMessage;

}/*** 构造PCM请求消息体

*

*@return

*/

private byte[] buildPcmData() throwsException {byte[] resultByte =longToBytes(System.currentTimeMillis());returnresultByte;

}/*** long转字节

*

*@paramvalues

*@return

*/

private byte[] longToBytes(longvalues) {byte[] buffer = new byte[8];for (int i = 0; i < 8; i++) {int offset = 64 - (i + 1) * 8;

buffer[i]= (byte) ((values >> offset) & 0xff);

}returnbuffer;

}/*** 将两个数组合并起来

*

*@paramarray1

*@paramarray2

*@return

*/

private byte[] addAll(byte[] array1, byte... array2) {byte[] joinedArray = new byte[array1.length +array2.length];

System.arraycopy(array1,0, joinedArray, 0, array1.length);

System.arraycopy(array2,0, joinedArray, array1.length, array2.length);returnjoinedArray;

}/*** 在处理过程中引发异常时被调用

*

*@paramctx

*@paramcause

*@throwsException*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {

log.error("[Client] netty client request error: {}", cause.getMessage());

ctx.close();

}

}

5、服务端handler:

packagecom.wlf.netty.nettyserver.handler;importcom.alibaba.fastjson.JSON;importcom.wlf.netty.nettyapi.constant.MessageType;importcom.wlf.netty.nettyapi.javabean.NettyMessage;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerAdapter;importio.netty.channel.ChannelHandlerContext;importlombok.extern.slf4j.Slf4j;

@Slf4jpublic class NettyServerHandler extendsChannelHandlerAdapter {

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {

NettyMessage nettyMessage=(NettyMessage) msg;if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 1) {

log.info("[server] server receive client message : {}", nettyMessage);if (nettyMessage == null || nettyMessage.getData() == null) {

log.error("nettyMessage is null.");

}//获取时间戳(8字节)

byte[] data =nettyMessage.getData();

ByteBuf buf=Unpooled.buffer(data.length);

buf.writeBytes(data);long startTime =buf.readLong();

log.info("data length: {}", data.length);

log.info("startTime: {}", startTime);

}

}

@Overridepublic voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

log.error("server received failed, error : {}", cause.getMessage());

cause.printStackTrace();

ctx.close();

}

}

最后,为了启动,我们还得再补两个启动类:

6、客户端:

packagecom.wlf.netty.nettyclient.client;importcom.wlf.netty.nettyapi.msgpack.NettyMessageDecoder;importcom.wlf.netty.nettyapi.msgpack.NettyMessageEncoder;importcom.wlf.netty.nettyclient.handler.NettyClientHandler;importio.netty.bootstrap.Bootstrap;import io.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;importlombok.extern.slf4j.Slf4j;importjava.net.InetSocketAddress;/*** 客户端

* 1.为初始化客户端,创建一个Bootstrap实例

* 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;

* 3.当连接被建立时,一个NettyClientHandler实例会被安装到(该Channel的一个ChannelPipeline中;

* 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。*/@Slf4jpublic classNettyClient {private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

EventLoopGroup group= newNioEventLoopGroup();public void connect(int port, String host) throwsException {

NioEventLoopGroup workGroup= newNioEventLoopGroup();try{

Bootstrap bootstrap= newBootstrap();

bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Overridepublic void initChannel(SocketChannel channel) throwsException {

channel.pipeline().addLast(newNettyMessageDecoder());

channel.pipeline().addLast(newNettyMessageEncoder());

channel.pipeline().addLast(newNettyClientHandler());

}

});

ChannelFuture future=bootstrap.connect(host, port).sync();

future.channel().closeFuture().sync();

}finally{

workGroup.shutdownGracefully();

}

}public static void main(String[] args) throwsException {int port = 9911;new NettyClient().connect(port, "127.0.0.1");

}

}

7、服务端启动类:

packagecom.wlf.netty.nettyserver.server;importcom.wlf.netty.nettyapi.msgpack.NettyMessageDecoder;importcom.wlf.netty.nettyapi.msgpack.NettyMessageEncoder;importcom.wlf.netty.nettyserver.handler.NettyServerHandler;importio.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.logging.LogLevel;importio.netty.handler.logging.LoggingHandler;importlombok.extern.slf4j.Slf4j;

@Slf4jpublic classNettyServer {private final EventLoopGroup bossGroup = newNioEventLoopGroup();private final EventLoopGroup workGroup = newNioEventLoopGroup();public void bind(int port) throwsException{try{

ServerBootstrap serverBootstrap= newServerBootstrap();

serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG,100)

.handler(newLoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer() {

@Overridepublic void initChannel(SocketChannel channel) throwsException {

channel.pipeline().addLast(newNettyMessageDecoder());

channel.pipeline().addLast(newNettyMessageEncoder());

channel.pipeline().addLast(newNettyServerHandler());

}

});//绑定端口

ChannelFuture channelFuture =serverBootstrap.bind(port).sync();

channelFuture.channel().closeFuture().sync();

}finally{

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}public static void main(String[] args) throwsException {int port = 9911;newNettyServer().bind(port);

}

}

直接跑上面两个启动类,先跑服务端,再跑客户端:

客户端输出:

17:20:04.258 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] client send data : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[B@432f82d5}

服务端输出:

17:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[B@16eddbb3}

17:20:04.295 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - data length: 817:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1570785604258

以上解码在字节流总容量小于1024时都没问题,但超过就会出现服务端获取不到数据的问题,详情和解决办法请参见netty5拆包问题解决实例。


版权声明:本文为weixin_35689019原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。