Protobuf整合netty多种消息类型处理方案
1-1 前言
大致上和前一篇文档里写的类似。其关键点还是procto文件的中做的文章,大致上的思路是我们定
义一个消息类型在最外层,然后需要的消息类型包装在里面,让后定义一个枚举,枚举里面就是定义
各种消息类型的标识,通过这样一种方式,可以实现动态的多消息类型的传输。
1-2 proto文件
syntax = "proto2";
package com.zt.proto;
option optimize_for = SPEED;
option java_package = "com.zt.proto";
option java_outer_classname = "MessageInfo";
message MessageData {
enum MessageType{
DOG = 1;
CAT = 2;
PIG = 3;
}
optional MessageType messageType = 1 ;
oneof MessageBody{
Dog dog = 2;
Cat cat = 3;
Pig pig = 4;
}
}
message Dog {
optional string name = 1;
optional int32 age = 2;
}
message Cat {
optional string name = 1;
optional int32 age = 2;
optional int32 sex = 3;
}
message Pig {
optional string name = 1;
optional int32 age = 2;
}
- 执行的命令
protoc --java_out=src\main\java src\protobuf\PersonInfo.proto
1-3 netty相关
1-3-1 TestServer
package com.zt.proto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* @Description:
* @Date: 2020/12/15 10:53
* @author: zt
*/
public class TestServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
1-3-2 TestInitializer
package com.zt.proto;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* @Description:
* @Date: 2020/12/15 10:59
* @author: zt
*/
public class TestInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MessageInfo.MessageData.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
// 第一次没出现Bf shu'xua是ProtobufEncoder
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
1-3-3 TestServerHandler
package com.zt.proto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @Description:
* @Date: 2020/12/15 14:27
* @author: zt
*/
public class TestServerHandler extends SimpleChannelInboundHandler<MessageInfo.MessageData> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageInfo.MessageData msg) {
MessageInfo.MessageData.MessageType messageType = msg.getMessageType();
if (messageType == MessageInfo.MessageData.MessageType.Dog) {
MessageInfo.Dog data = msg.getDog();
System.out.println(data.getAge());
System.out.println(data.getName());
} else if (messageType == MessageInfo.MessageData.MessageType.Cat) {
MessageInfo.Cat data = msg.getCat();
System.out.println(data.getAge());
System.out.println(data.getName());
System.out.println(data.getSex());
} else {
MessageInfo.Pig data = msg.getPig();
System.out.println(data.getAge());
System.out.println(data.getName());
}
}
}
1-3-4 TestClient
package com.zt.proto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @Description:
* @Date: 2020/12/15 14:28
* @author: zt
*/
public class TestClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
1-3-5 TestClientInitializer
package com.zt.proto;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* @Description:
* @Date: 2020/12/15 14:31
* @author: zt
*/
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MessageData.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}
1-3-6 TestClientHandler
package com.zt.proto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
/**
* @Description:
* @Date: 2020/12/15 14:37
* @author: zt
*/
public class TestClientHandler extends SimpleChannelInboundHandler<MessageInfo.MessageData> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageInfo.MessageData msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 这里模拟随机发送数据
int dataType = new Random().nextInt(3);
MessageInfo.MessageData msg = null;
switch (dataType) {
case 0:
msg = MessageInfo.MessageData.newBuilder()
.setMessageType(MessageInfo.MessageData.MessageType.Dog)
.setDog(MessageInfo.Dog.newBuilder()
.setAge(1)
.setName("一只狗")
.build())
.build();
break;
case 1:
msg = MessageInfo.MessageData.newBuilder()
.setMessageType(MessageInfo.MessageData.MessageType.Cat)
.setCat(MessageInfo.Cat.newBuilder()
.setAge(1)
.setName("一只猫")
.setSex(11)
.build())
.build();
break;
case 2:
msg = MessageInfo.MessageData.newBuilder()
.setMessageType(MessageInfo.MessageData.MessageType.Pig)
.setPig(MessageInfo.Pig.newBuilder()
.setAge(1)
.setName("一只猪")
.build())
.build();
break;
default:
break;
}
ctx.channel().writeAndFlush(msg);
}
}
1-3-7 maven依赖
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobufJavaVersion}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobufJavaUtilVersion}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${nettyAllVersion}</version>
</dependency>
</dependencies>
1-3-8 说明
- TestClientHandler
再验证客户端连接上服务段后我的发送消息写在回调函数里面,即是下面这段代码,我们在使用随机数来模拟随机发数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 这里模拟随机发送数据
int dataType = new Random().nextInt(3);
MessageInfo.MessageData msg = null;
switch (dataType) {
case 0:
msg = MessageInfo.MessageData.newBuilder()
.setMessageType(MessageInfo.MessageData.MessageType.Dog)
.setDog(MessageInfo.Dog.newBuilder()
.setAge(1)
.setName("一只狗")
.build())
.build();
break;
case 1:
msg = MessageInfo.MessageData.newBuilder()
.setMessageType(MessageInfo.MessageData.MessageType.Cat)
.setCat(MessageInfo.Cat.newBuilder()
.setAge(1)
.setName("一只猫")
.setSex(11)
.build())
.build();
break;
case 2:
msg = MessageInfo.MessageData.newBuilder()
.setMessageType(MessageInfo.MessageData.MessageType.Pig)
.setPig(MessageInfo.Pig.newBuilder()
.setAge(1)
.setName("一只猪")
.build())
.build();
break;
default:
break;
}
ctx.channel().writeAndFlush(msg);
}
}
- TestServerHandler 服务段的hanlder
我们在可以看到接受数据,加上数据类型的判断逻辑,将对应的数据类型的数据打印出来。
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageInfo.MessageData msg) {
MessageInfo.MessageData.MessageType messageType = msg.getMessageType();
if (messageType == MessageInfo.MessageData.MessageType.Dog) {
MessageInfo.Dog data = msg.getDog();
System.out.println(data.getAge());
System.out.println(data.getName());
} else if (messageType == MessageInfo.MessageData.MessageType.Cat) {
MessageInfo.Cat data = msg.getCat();
System.out.println(data.getAge());
System.out.println(data.getName());
System.out.println(data.getSex());
} else {
MessageInfo.Pig data = msg.getPig();
System.out.println(data.getAge());
System.out.println(data.getName());
}
}
1-4 日志
1-4-1 服务段日志
- 我们多次重启客户端,随机发送不同的数据类型,在日志新的异常是重启客户端打印出来的,不影响我们分析相关代码的逻辑。
- 下面的服务段的日志信息
十二月 17, 2020 11:22:08 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x1e972a2b] REGISTERED
十二月 17, 2020 11:22:08 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x1e972a2b] BIND: 0.0.0.0/0.0.0.0:8899
十二月 17, 2020 11:22:08 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] ACTIVE
十二月 17, 2020 11:22:11 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xf4e3bb4d, L:/127.0.0.1:8899 - R:/127.0.0.1:62042]
十二月 17, 2020 11:22:11 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
1
一只猪
十二月 17, 2020 11:22:16 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1147)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
十二月 17, 2020 11:22:19 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x4140980b, L:/127.0.0.1:8899 - R:/127.0.0.1:62087]
十二月 17, 2020 11:22:19 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
1
一只狗
十二月 17, 2020 11:22:24 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1147)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
十二月 17, 2020 11:22:27 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x44900222, L:/127.0.0.1:8899 - R:/127.0.0.1:62132]
十二月 17, 2020 11:22:27 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
1
一只狗
十二月 17, 2020 11:23:09 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1147)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
十二月 17, 2020 11:23:14 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x011ea4a4, L:/127.0.0.1:8899 - R:/127.0.0.1:62205]
十二月 17, 2020 11:23:14 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x1e972a2b, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
1
一只猫
11
版权声明:本文为qq_42641261原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。