使用netty和messagepack编写网络请求

在我们使用Netty开发基于网络的应用程序的时候,你都需要实现一些符合自己应用的codec,在Netty中也提供了很多种编解码的实现,在实现自定义编解码器的时候,我们只需要继承相关接口后,重写部分方法就可以实现decode和encode。例如在我们继承了ByteToMessageDecoder 类后只需要重写decode()方法就可以实现解码。

在学习Netty的过程中我实现了一个编解码的demo可以对MessagePack进行编解码。MessagePack是一个类似Json的序列化技术,据官方说它比json更小更快。

在这个例子中,在客户端中,MsgPackEncode在继承MessageToByteEncoder后重写了encode()方法,把Student对象编码为byte数组;在服务器端MsgPackDecode在继承MessageToMessageDecoder后重写了decode()方法,把byte数组解码成List<Object>。

客户端代码: TimeClient:

public class TimeClient {  
  
  
    private final static int port = 28080;  
  
  
    private final static String host = "127.0.0.1";  
  
  
    private static EventLoopGroup group = new NioEventLoopGroup();  
  
  
    public void createBootstrap(Bootstrap b, EventLoopGroup group) {  
        b.group(group).channel(NioSocketChannel.class)  
                .option(ChannelOption.TCP_NODELAY, true)  
                .handler(new ChannelInitializer<SocketChannel>() {  
  
  
                    @Override  
                    protected void initChannel(SocketChannel ch)  
                            throws Exception {  
                        ChannelPipeline pipeline = ch.pipeline();  
                        pipeline.addLast(new MsgPackDecode());  
                        pipeline.addLast(new MsgPackEncode());  
                        pipeline.addLast(new TimeClientHandle());  
                          
                    }  
  
  
                });  
        try {  
            ChannelFuture f = b.connect(host, port).sync();  
            f.channel().closeFuture().sync();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
            group.shutdownGracefully();  
        }  
    }  
  
  
    public static void main(String[] args) {  
        new TimeClient().createBootstrap(new Bootstrap(), group);  
    }  
  
  
}

TimeClientHandle:

public class TimeClientHandle extends ChannelInboundHandlerAdapter {  
  
  
  
  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {  
        ctx.writeAndFlush(getStudent());  
    }  
  
  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
    }  
  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
            throws Exception {  
        ctx.close();  
    }  
      
    private Student getStudent(){  
        Student s = new Student();  
        s.setId(3);  
        s.setName("zhangsan");  
        return s;  
    }  
  
  
}

服务端代码: TimeServer:

public class TimeServer {  
  
  
    public void bind(int port) {  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workGroup = new NioEventLoopGroup();  
        ServerBootstrap b = new ServerBootstrap();  
        b.group(bossGroup, workGroup)  
            .channel(NioServerSocketChannel.class)  
            .option(ChannelOption.SO_BACKLOG, 1024)  
            .childHandler(new ChannelInitializer<SocketChannel>(){  
  
  
                    @Override  
                    protected void initChannel(SocketChannel ch)  
                            throws Exception {  
                        ChannelPipeline pipeline = ch.pipeline();  
                        pipeline.addLast(new MsgPackDecode());  
                        pipeline.addLast(new MsgPackEncode());  
                        ch.pipeline().addLast(new TimeServerHandle());  
                    }  
          
        });  
        try {  
            ChannelFuture f = b.bind(port).sync();  
            f.channel().closeFuture().sync();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
            bossGroup.shutdownGracefully();  
            workGroup.shutdownGracefully();  
        }  
    }  
      
      
    public static void main(String[] args) {  
        int port = 28080;  
        new TimeServer().bind(port);  
    }  
}

TimeSevrerHandle:

public class TimeServerHandle extends ChannelInboundHandlerAdapter {  
  
  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg)  
            throws Exception {  
        @SuppressWarnings("unchecked")  
        List<Student> s = (List<Student>) msg;  
        System.out.println(s);  
  
  
    }  
  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
            throws Exception {  
        ctx.close();  
    }  
  
  
}

其中服务端和客户端都有的Student类、MsgPackEncode、MsgPackDecode为: Student:

@Message  
public class Student {  
      
    private int id;  
      
    private String name;  
  
  
    public int getId() {  
        return id;  
    }  
  
  
    public void setId(int id) {  
        this.id = id;  
    }  
  
  
    public String getName() {  
        return name;  
    }  
  
  
    public void setName(String name) {  
        this.name = name;  
    }  
  
  
    @Override  
    public String toString() {  
        return "Student [id=" + id + ", name=" + name + "]";  
    }  
      
}

MsgPackEncode:

public class MsgPackEncode extends MessageToByteEncoder<Object> {  
  
  
    @Override  
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)  
            throws Exception {  
        MessagePack msgPack = new MessagePack();  
        System.out.println(msg == null);  
        byte[] raw = null;  
        raw = msgPack.write(msg);  
        out.writeBytes(raw);  
    }  
}

MsgPackDecode:

public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{  
  
  
    @Override  
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg,  
            List<Object> out) throws Exception {  
        final byte[] array;  
        final int length = msg.readableBytes();  
        array = new byte[length];  
        msg.getBytes(msg.readableBytes(), array, 0, length);  
        MessagePack msgPack = new MessagePack();  
        out.add(msgPack.read(array));  
    }  
  
  
}

服务端控制台输出为:

[3,"zhangsan"]

在写这个demo的时候,遇到了两个坑:

  • 要传输的javabean一定要加上注解@message,否则会报错。
  • 最后服务端decode后的对象是一个List<Object>(害我研究了半天,不知道为什么报错,最后还是查看的官方文档才解决的),你直接用对象去接收,会报异常org.msgpack.type.ArrayValueImpl cannot be cast to com.nettyserver.Student