在我们使用Netty开发基于网络的应用程序的时候,你都需要实现一些符合自己应用的codec,在Netty中也提供了很多种编解码的实现,在实现自定义编解码器的时候,我们只需要继承相关接口后,重写部分方法就可以实现decode和encode。例如在我们继承了ByteToMessageDecoder 类后只需要重写decode()方法就可以实现解码。
在学习Netty的过程中我实现了一个编解码的demo可以对MessagePack进行编解码。MessagePack是一个类似Json的序列化技术,据官方说它比json更小更快。
在这个例子中,在客户端中,MsgPackEncode在继承MessageToByteEncoder后重写了encode()方法,把Student对象编码为byte数组;在服务器端MsgPackDecode在继承MessageToMessageDecoder后重写了decode()方法,把byte数组解码成List<Object>。
客户端代码: TimeClient:
public class TimeClient {
private final static int port = 28080;
private final static String host = "127.0.0.1";
private static EventLoopGroup group = new NioEventLoopGroup();
public void createBootstrap(Bootstrap b, EventLoopGroup group) {
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MsgPackDecode());
pipeline.addLast(new MsgPackEncode());
pipeline.addLast(new TimeClientHandle());
}
});
try {
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
group.shutdownGracefully();
}
}
public static void main(String[] args) {
new TimeClient().createBootstrap(new Bootstrap(), group);
}
}TimeClientHandle:
public class TimeClientHandle extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(getStudent());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
private Student getStudent(){
Student s = new Student();
s.setId(3);
s.setName("zhangsan");
return s;
}
}服务端代码: TimeServer:
public class TimeServer {
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MsgPackDecode());
pipeline.addLast(new MsgPackEncode());
ch.pipeline().addLast(new TimeServerHandle());
}
});
try {
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 28080;
new TimeServer().bind(port);
}
}TimeSevrerHandle:
public class TimeServerHandle extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
@SuppressWarnings("unchecked")
List<Student> s = (List<Student>) msg;
System.out.println(s);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}其中服务端和客户端都有的Student类、MsgPackEncode、MsgPackDecode为: Student:
@Message
public class Student {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Student [id=" + id + ", name=" + name + "]";
}
}MsgPackEncode:
public class MsgPackEncode extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
throws Exception {
MessagePack msgPack = new MessagePack();
System.out.println(msg == null);
byte[] raw = null;
raw = msgPack.write(msg);
out.writeBytes(raw);
}
}MsgPackDecode:
public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
final byte[] array;
final int length = msg.readableBytes();
array = new byte[length];
msg.getBytes(msg.readableBytes(), array, 0, length);
MessagePack msgPack = new MessagePack();
out.add(msgPack.read(array));
}
}服务端控制台输出为:
[3,"zhangsan"]在写这个demo的时候,遇到了两个坑:
- 要传输的javabean一定要加上注解@message,否则会报错。
- 最后服务端decode后的对象是一个List<Object>(害我研究了半天,不知道为什么报错,最后还是查看的官方文档才解决的),你直接用对象去接收,会报异常org.msgpack.type.ArrayValueImpl cannot be cast to com.nettyserver.Student