Netty入门(四) --- 编码和解码,Netty的handler调用机制,TCP粘包和拆包原理和解决


往期

七、Google Protobuf

7.1 编码和解码简介

编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码

codec(编解码器) 的组成部分有两个:decoder(解码器)encoder(编码器)

image-20220124215502699

7.2 Netty本身的编解码机制和问题分析

Netty自身提供了一些编解码器

  • Netty提供的编码器:
    • StringEncoder,对字符串数据进行编码
    • ObjectEncoder,对 Java 对象进行编码
    • 。。。
  • Netty提供的解码器:
    • StringDecoder, 对字符串数据进行解码
    • ObjectDecoder,对 Java 对象进行解码

问题:

Netty本身带的ObjectDecoder 和 ObjectEncoder 虽然可以用来实现 POJO 对象或各种业务对象的编码和解码

但是底层使用的是Java序列化技术

  • Java序列化本身效率不高,
  • 且无法跨语言,
  • 序列化后体积太大,是二进制码的的5倍多
  • 序列化性能太低

所以引出新的解决方案:Google Protobuf

7.3 Protobuf基本介绍

Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用 remote procedure call ] 数据交换格式

目前很多公司在从 http+json 向 rpc+protobuf 转型

参考文档: https://developers.google.com/protocol-buffers/docs/proto

优点:

  • 支持跨平台、跨语言,即客户端服务器端可以使用不同语言编写

  • 高性能,高可靠

  • 使用 protobuf 编译器能自动生成代码

  • 通过 protoc.exe 编译器

    image-20220124220859308

7.4 Protobuf发送单对象

客户端可以发送一个Student PoJo对象到服务器 (通过 Protobuf 编码)

服务端能接收Student PoJo对象,并显示信息(通过 Protobuf 解码)

1、idea安装如下插件

image-20220124221035760

2、新建一个Student.proto文件

syntax = "proto3";  //协议版本
option java_outer_classname = "StudentPOJO";    //生成的外部类名,同时也是.java的文件名
//protobuf 使用message 管理数据
message Student{    //在StudentPOJO这个外部类里面生成一个 内部类Student,是真正发送的pojo对象
    int32 id = 1;   //Student类中有一个属性 id 类型为 int32,对应java里的int。  1 不是指,是属性的序号
    string name = 2;
}
  • protobuf 使用message 管理数据
  • message 等号后面是属性的编号,不是值

3、使用 protoc.exe 编译。如图:Student.proto ==》StudentPOJO.java

生成java文件的语句:protoc.exe --java_out=. xxx.proto

image-20220124221304951

4、拷贝回项目

image-20220124221618684

发现StudentPOJO有一个Student内部类

5、编写客户端

NettyClient

public class NettyClient {
    public static void main(String[] args) {

        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)     //设置线程组
                    .channel(NioSocketChannel.class)//设置客户端通道实现类
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //客户端发送数据,需要加上编码器
                            pipeline.addLast("ProtobufEncoder",new ProtobufEncoder());

                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("客户端 ok");

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}

注意:ChannelInitializer里要在pipeline里添加编码器(客户端发送)

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送一个 Student 对象到服务器
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(6).setName("豹子头 零冲").build();
        ctx.writeAndFlush(student);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

注意:使用StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(6).setName("豹子头 零冲").build();获取生成的那个类的实例

6、编写服务器端

NettyServer

public class NettyServer {
    public static void main(String[] args) {

            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器启动对象,用来配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)   //线程队列得到连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)   //设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //服务器端接收数据,解码
                            pipeline.addLast("ProtobufDecoder",new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            System.out.println("... 服务器 ok ...");
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

pipeline.addLast("ProtobufDecoder",new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance())); 配置解码器,需要和Protobuf生成类类型对应

NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        StudentPOJO.Student student = (StudentPOJO.Student) msg;
        System.out.println("客户端发来:id="+student.getId()+" 姓名="+student.getName());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端",CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

StudentPOJO.Student student = (StudentPOJO.Student) msg;对msg进行强转

7、运行截图

image-20220124222043899

7.5 Protobuf发送多对象

真实业务中不可能就一种类型,可能有多种类型,这次来传递多种类型

1、编写Message.proto

syntax = "proto3";
option optimize_for = SPEED;//加快解析
option java_package = "com.bandit.netty.codec2";//指定生成包
option java_outer_classname = "MyDataInfo";//外部类名称

//protobuf 可以使用message管理其他message
message MyMessage{
    enum DateType {
        StudentType = 0;//在proto3 要求enum编号从0开始
        TeacherType = 1;
    }
    //用data_type 来标识传递的是哪个枚举类型
    DateType date_type = 1;

    //表示每次枚举类型最多只能出现其中的一个
    oneof dataBody{
        Student student = 2;
        Teacher teacher = 3;
    }
}

//Student类
message Student{
    int32 id = 1;
    string name = 2;
}
//Teacher类
message Teacher{
    string name = 1;
    int32 id = 2;
    int32 age = 3;
}
  • option java_package:指定生成包
  • option optimize_for = SPEED:加快解析
  • protobuf可以使用一个message管理其它多个message
    • 在里面创建一个枚举类型,在生成java文件后方便判断编号对应哪个数据
    • DateType date_type = 1:用data_type 来标识传递的是哪个枚举类型
    • oneof dataBody:表示每次枚举类型最多只能出现其中的一个

2、客户端,和上例类似

NettyClient

public class NettyClient {    public static void main(String[] args) {        EventLoopGroup eventExecutors = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(eventExecutors)     //设置线程组                    .channel(NioSocketChannel.class)//设置客户端通道实现类                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            ChannelPipeline pipeline = socketChannel.pipeline();                            //客户端发送数据,需要加上编码器                            pipeline.addLast("ProtobufEncoder",new ProtobufEncoder());                            pipeline.addLast(new NettyClientHandler());                        }                    });            System.out.println("客户端 ok");            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();            channelFuture.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            eventExecutors.shutdownGracefully();        }    }}

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //随机发送Student 或者 Teacher        int random = new Random().nextInt(3);        MyDataInfo.MyMessage myMessage = null;        if (0==random) {            myMessage = MyDataInfo.MyMessage.newBuilder()                    .setDateType(MyDataInfo.MyMessage.DateType.StudentType)                    .setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("学生5号").build()).build();        } else{            myMessage = MyDataInfo.MyMessage.newBuilder()                    .setDateType(MyDataInfo.MyMessage.DateType.TeacherType)                    .setTeacher(MyDataInfo.Teacher.newBuilder().setId(1).setAge(20).setName("老师1号").build()).build();        }        ctx.writeAndFlush(myMessage);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}

在NettyClientHandler中随机发送datatype不同的数据

3、服务端

NettyServer

public class NettyServer {    public static void main(String[] args) {            EventLoopGroup bossGroup = new NioEventLoopGroup(1);            EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            //创建服务器启动对象,用来配置参数            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup,workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG,128)   //线程队列得到连接的个数                    .childOption(ChannelOption.SO_KEEPALIVE,true)   //设置保持活动连接状态                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            ChannelPipeline pipeline = socketChannel.pipeline();                            //服务器端接收数据,解码                            pipeline.addLast("ProtobufDecoder",new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));                            pipeline.addLast(new NettyServerHandler());                        }                    });            System.out.println("... 服务器 ok ...");            ChannelFuture channelFuture = bootstrap.bind(6666).sync();            channelFuture.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

pipeline.addLast("ProtobufDecoder",new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));和上例类似

NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MyDataInfo.MyMessage message = (MyDataInfo.MyMessage) msg;
        //根据dataType显示不同信息
        MyDataInfo.MyMessage.DateType dateType = message.getDateType();
        if (dateType == MyDataInfo.MyMessage.DateType.StudentType){
            MyDataInfo.Student student = message.getStudent();
            System.out.println("学生id:"+student.getId()+" 学生姓名:"+student.getName());
        } else if (dateType == MyDataInfo.MyMessage.DateType.TeacherType){
            MyDataInfo.Teacher teacher = message.getTeacher();
            System.out.println("老师id:"+teacher.getId()+" 年龄:"+teacher.getAge()+"岁 姓名:"+teacher.getName());
        } else {
            System.out.println("传输类型不正确");
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端",CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

根据dataType显示不同信息

4、运行截图

image-20220124223209432

八、Netty编解码器和handler调用机制

8.1 基本说明

ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器 。同时,ChannelPipeline又是ChannelHandler链的容器

  • 实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),就可以接收入站事件和数据,并对这些事数据进行业务处理
  • 当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。
  • 业务逻辑通常写在一个或者多个ChannelInboundHandler中
  • 相反还有个ChannelOutboundHandler,原理和ChannelInboundHandler一样,只是入站出站方向不同

入站和出站

无论是对客户端还是服务器端,数据送出去就是出站,从另外一端传入进来就是入站

image-20220124224124049

ChannelHandler实际上可以看做一个过滤器,ChannelPipeline可以看作过滤器的链

此处是责任链模式

8.2 编解码器基本原理

实际上,编码器实现了ChannelOutboundHandler,对出站数据进行加工;解码器实现了ChannelInboundHadnler,对入站数据进行加工。 在这些类中,channelRead方法已经被重写了。

以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler,进行下一步处理

image-20220124224655413

8.3 Netty的handler链调用机制

使用自定义的编码器和解码器来说明Netty的handler 调用机制

客户端发送long -> 服务器

服务端发送long -> 客户端


1、编码器和解码器

MyLongToByteEncoder编码器,Long转成Byte

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Long msg, ByteBuf byteBuf) throws Exception {
        System.out.println("MyLongToByteEncoder->encode 被调用");
        System.out.println("msg="+msg);
        byteBuf.writeLong(msg);
    }
}

MyByteToLongDecoder解码器,Byte转成Long

public class MyByteToLongDecoder extends ByteToMessageDecoder {

    /**
     *  decode 会根据接收的数据,被调用多次,直到没有心得元素被添加到list,
     *  或者是ByteBuf没有更多可读字节为止
     *  如果list不为空,就会将内容交给下一个channelinboundhandler处理,该处理器方法也会被调用多次
     *
     * @param ctx 上下文对象
     * @param in  入站的 ByteBuf
     * @param list  将解码后的数据交给下一个handler处理
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {
        System.out.println("MyByteToLongDecoder->decode被调用");
        if(in.readableBytes()>=8){//Long 8个字节
            list.add(in.readLong());
        }
    }
}
  • decode 会根据接收的数据,被调用多次,直到没有心得元素被添加到list,
  • 或者是ByteBuf没有更多可读字节为止
  • 如果list不为空,就会将内容交给下一个channelinboundhandler处理,该处理器方法也会被调用多次

2、客户端:

MyClient

public class MyClient {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer());

            System.out.println("客户端准备好了。。");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }


    }
}

MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //发送消息,出站,加入编码器 MyLongToByteEncoder
        pipeline.addLast("ClientLongToByteEncoder",new MyLongToByteEncoder());
        pipeline.addLast("ClientByteToLongDecoder",new MyByteToLongDecoder());
        pipeline.addLast(new MyClientHandler());
    }
}

发送消息,出站,加入编码器 MyLongToByteEncoder,后续接收消息加入解码器

MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("服务器"+ctx.channel().remoteAddress()+"消息:"+msg);
    }

    //重写channelActive发送数据
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyClientHandler发送数据");
        ctx.writeAndFlush(123456L);//直接发一个Long
    }
}

客户端直接发送一个Long

3、服务器端

MyServer

public class MyServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());//自定义一个初始化类

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            System.out.println("服务器准备好了。。");
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //服务端接收消息,解码 MyBateToLongDecoder
        pipeline.addLast("ServerBateToLongDecoder",new MyByteToLongDecoder());
        pipeline.addLast("ServerLongToByteEncoder",new MyLongToByteEncoder());
        pipeline.addLast(new MyServerHandler());
    }
}

服务器端接收消息,加入解码器 MyBateToLongDecoder,返回消息加入编码器

MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("MyServerHandler读取客户端"+ctx.channel().remoteAddress()+"发来的消息:"+msg);
        //给客户端回送一个Long
        ctx.writeAndFlush(98765L);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

4、运行截图

image-20220124225946332

可以看到:

客户端先发消息,所以先编码,后面接收消息再解码

服务器端先接收消息,所以先解码,后面发送消息再编码

8.4 一些简单的解码器

8.4.1 ByteToMessageDecoder 解码器

关系继承图:

image-20220124224621844

一个实例分析:

public class ToIntegerDecoder extends ByteToMessageDecoder {
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{	
        if (in.readableBytes() >= 4) {
        	out.add(in.readInt());
        }
	}
}

这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。 当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。 int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据

8.4.2 ReplayingDecoder解码器

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法判断长度是否够用。参数S指定了用户状态管理的类型,其中Void代表不需要状态管理

ReplayingDecoder使用方便,但它也有一些局限性:

  • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个 UnsupportedOperationException
  • ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder ,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢

8.4.3 其他的编解码器

  • 解码器
    • LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
    • DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
    • HttpObjectDecoder:一个HTTP数据的解码器
    • LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理 黏包和半包消息。
    • 。。。
  • 编码器
    • ObjectEncoder:Java 各种类型
    • SocksMessageEncoder:SOCK协议编码器
    • ZlibEncoder:压缩
    • Bzip2Encoder:压缩
    • 。。。

image-20220124231428302

九、TCP粘包和拆包

9.1 粘包和拆包简介

TCP是面向连接的,面向流的,提供高可靠性服务。

收发两端(客户端和服务器端) 都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。

这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的

图解:

image-20220124235757224

9.2 粘包和拆包现象实例

准备一个客户端向服务器端连续发送10个包的demo

1、客户端:

TCPClient

public class TCPClient {    public static void main(String[] args) {        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(eventLoopGroup)                    .channel(NioSocketChannel.class)                    .handler(new TCPClientInitializer());            System.out.println("客户端准备好了。。");            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();            channelFuture.channel().closeFuture().sync();        } catch (Exception e) {            e.printStackTrace();        } finally {            eventLoopGroup.shutdownGracefully();        }    }}

TCPClientInitializer

public class TCPClientInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast(new TCPClientHandler());    }}

TCPClientHandler

public class TCPClientHandler extends SimpleChannelInboundHandler<ByteBuf> {    private int count;    //发包    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //客户端发十次包 hello,sever 编号        for (int i = 0; i < 10; i++) {            ByteBuf byteBuf = Unpooled.copiedBuffer("hello,sever " + i, CharsetUtil.UTF_8);            ctx.writeAndFlush(byteBuf);        }    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {        byte[] buffer = new byte[msg.readableBytes()];        msg.readBytes(buffer);        String message = new String(buffer, CharsetUtil.UTF_8);        System.out.println("客户端接收到消息:"+message);        System.out.println("客户端接收到几次:"+ ++count);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}

在TCPClientHandler中客户端连续发送十次包:hello,sever 编号

2、服务器端

TCPServer

public class TCPServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new TCPServerInitializer());//自定义一个初始化类

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            System.out.println("服务器准备好了。。");
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

TCPServerInitializer

public class TCPServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new TCPServerHandler());
    }
}

TCPServerHandler

public class TCPServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, CharsetUtil.UTF_8);
        System.out.println("服务器端接收到:"+message);
        System.out.println("服务器端接收了几次"+ ++count);

        ByteBuf responseBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString().substring(0, 5), CharsetUtil.UTF_8);
        ctx.writeAndFlush(responseBuf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

服务器打印接收到的数组以及接收到了几次

3、运行截图

image-20220125000308970

我开了两次客户端,第一次全部粘在一起了,第二次后面8个包全粘在一起了

9.3 解决方案:编码解码

使用自定义协议 + 编解码器 来解决

1、新建一个MessageProtocol实体类,用来传输长度和内容

public class MessageProtocol {    private int len;    private byte[] content;    public MessageProtocol(int len, byte[] content) {        this.len = len;        this.content = content;    }    public int getLen() {        return len;    }    public void setLen(int len) {        this.len = len;    }    public byte[] getContent() {        return content;    }    public void setContent(byte[] content) {        this.content = content;    }}

2、新建一个对MessageProtocol的编码器和解码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {        System.out.println("MessageEncoder encode 被调用");        byteBuf.writeInt(messageProtocol.getLen());        byteBuf.writeBytes(messageProtocol.getContent());    }}
public class MessageDecoder extends ReplayingDecoder<Void> {    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {        System.out.println("MessageDecoder decode 被调用");        //将字节码转成 MessageProtocol对象 数据包        int length = byteBuf.readInt();        byte[] content = new byte[length];        byteBuf.readBytes(content);        //封装成 MessageProtocol 放入list        list.add(new MessageProtocol(length,content));    }}

3、在刚刚的TCPServerInitializer和TCPClientInitializer添加编码器和解码器

public class TCPServerInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast("ServerDecoder",new MessageDecoder());        pipeline.addLast("ServerEncoder",new MessageEncoder());        pipeline.addLast(new TCPServerHandler());    }}
public class TCPClientInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast("ClientDecoder",new MessageDecoder());        pipeline.addLast("ClientEncoder",new MessageEncoder());        pipeline.addLast(new TCPClientHandler());    }}

4、重写TCPClientHandler和TCPServerHandler

public class TCPClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;
    //管道激活就开始发包
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //客户端发十次包  今天几号?i 号咯
        for (int i = 10; i < 17; i++) {
            String message = "今天几号? "+i+"号咯";
            byte[] content = message.getBytes(StandardCharsets.UTF_8);
            int length = content.length;

            MessageProtocol messageProtocol = new MessageProtocol(length, content);
            ctx.writeAndFlush(messageProtocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        byte[] content = msg.getContent();
        int len = msg.getLen();
        System.out.println("客户端接收到信息:len="+len+" content="+ new String(content, CharsetUtil.UTF_8));
        System.out.println("客户端接收到信息次数"+ ++count + "\n");
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}
public class TCPServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("服务端接收到信息:len="+len+" content="+ new String(content, CharsetUtil.UTF_8));
        System.out.println("服务端接收到信息次数"+ ++count + "\n");

        //回复消息
        String responseContent = UUID.randomUUID().toString().substring(0, 5);
        byte[] resBytes = responseContent.getBytes(StandardCharsets.UTF_8);
        int resLen = resBytes.length;
        MessageProtocol messageProtocol = new MessageProtocol(resLen, resBytes);
        ctx.writeAndFlush(messageProtocol);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

5、运行截图

image-20220125000843634

可以看到,进行编码解码之后,就不会粘包和拆包了


版权声明:本文为qq_45380083原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。