一、应用场景
由于最近在做硬件通讯交互方面的项目,使用的是 Netty 作为中间件,存在以下两个应用场景:
1)业务平台通过 Netty 下发一条指令到终端设备,且终端设备需要上行指令确认,说明该终端设备收到该条指令。但是,存在网络抖动或终端设备突然失联等情况,此时,客户端并未收到该条指令。因此,期望服务端能够在指定时间间隔和重试次数的情况下重发未收到上行确认的指令。
2)周期性下发通讯指令,获得终端设备的近实时数据,如定位信息、终端状态等。
二、解决方案
提出以下两种解决方案,若读者们有更好的 ideas,欢迎评论交流。
1)全局定义一个周期性的定时任务,负责所有通道的定时任务。服务端绑定端口并成功监听后,通过 NioServerSocketChannel 获得 NioEventLoop,在该 NioEventLoop 中设置周期性的定时任务。
2)每个通讯通道各自设置周期性的定时任务。与第一个方案不同的是,该方案是通过 NioSocketChannel 获得 NioEventLoop,即每建立一个通讯通道,都会在该 NioEventLoop 创建一个定时任务。
三、示例程序
/**
* 需要重发消息的实体类
*/
public class ResendMessage implements Serializable {
private static final long serialVersionUID = 610633890747696854L;
/**
* 通讯通道
*/
private Channel channel;
/**
* 终端设备 ID
*/
private String terminalId;
/**
* 需要重发的消息
*/
private String message;
/**
* 已重发的次数
*/
private Integer retry = 0;
public ResendMessage() {
}
public ResendMessage(Channel channel, String terminalId, String message) {
this.channel = channel;
this.terminalId = terminalId;
this.message = message;
}
// 省略 getter 和 setter 方法
}
/**
* 服务端程序
*/
public class ResendServer {
private static final int port = 9999;
public static final DateTimeFormatter DTF_YMD_HMS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 终端设备ID <-> 需要重发的消息实体
public static final Map<String, ResendMessage> RESEND_CHANNEL_MAP = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ResendChannelInitializer());
ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("ResendServer is listening on port: " + port);
Channel channel = channelFuture.channel();
// 设置周期性定时任务,10 秒后执行,每隔 10s 执行一次
channel.eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (RESEND_CHANNEL_MAP.size() > 0) {
for (Map.Entry<String, ResendMessage> messageObjectEntry : RESEND_CHANNEL_MAP.entrySet()) {
ResendMessage resendMessage = messageObjectEntry.getValue();
// 重试次数
Integer retry = resendMessage.getRetry();
if (retry < 3) {
Channel clientChannel = resendMessage.getChannel();
// 重发消息
clientChannel.writeAndFlush(Unpooled.copiedBuffer(resendMessage.getMessage(), CharsetUtil.UTF_8));
resendMessage.setRetry(++retry);
System.out.println(DTF_YMD_HMS.format(LocalDateTime.now()) + ": 重发消息 -> " + resendMessage.getMessage());
}
}
}
}
}, 10, 10, TimeUnit.SECONDS);
}
});
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
/**
* 通道初始化配置
*/
public class ResendChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ResendChannelHandler());
}
}
/**
* 自定义 Handler 处理器
*/
public class ResendChannelHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【通道】" + ctx.channel().id().asLongText() + " 建立\n");
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("where are you? I can not find you\n", CharsetUtil.UTF_8));
ResendMessage resendMessage = new ResendMessage(ctx.channel(),
"12345678",
"where are you? I can not find you\n"
);
ResendServer.RESEND_CHANNEL_MAP.put("12345678", resendMessage);
}
ResendServer.RESEND_CHANNEL_MAP.put("12345678", resendMessage);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception {
SocketAddress socketAddress = ctx.channel().remoteAddress();
System.out.println("【客户端】地址为:" + socketAddress);
System.out.println("【客户端】发送消息:" + message);
String[] strArr = message.split(":");
if (Objects.nonNull(ResendServer.RESEND_CHANNEL_MAP.get(strArr[1].trim()))) {
ResendServer.RESEND_CHANNEL_MAP.remove(strArr[1].trim());
}
}
}
四、测试结果
本文用 TCP&UDP 测试工具在作为客户端进行通讯测试。启动服务端程序,并发起客户端连接,如下图所示:
服务端打印的记录如下图所示:

客户端通讯连接如下图所示:

版权声明:本文为weixin_41427129原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。