项目需求:搭建服务与设备进行socket通信;与api服务端进行http请求(或websocket)通信传递消息;
项目环境:IDEA+Maven ;springboot;mybatis;netty;
1.服务端DiscardServer
/**
* 丢弃任何进入的数据 启动服务端的DiscardServerHandler
*/
@Component
public class DiscardServer{
public static class SingletionDiscardServer
{
static final DiscardServer instance=new DiscardServer();
}
public static DiscardServer getInstance(){
return SingletionDiscardServer.instance;
}
private EventLoopGroup bossGroup ;
private EventLoopGroup workerGroup;
private ServerBootstrap server;
private ChannelFuture future;
protected Logger logger = LoggerFactory.getLogger(getClass());
public void start(){
bossGroup = new NioEventLoopGroup();// 用于处理客户端的连接请求
workerGroup = new NioEventLoopGroup();//用于处理与各个客户端连接的 IO 操作
try {
server = new ServerBootstrap();
//绑定处理group
server = server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//定义可连接的客户端队列大小
.option(ChannelOption.SO_BACKLOG, 128)
//有数据立即发送;积攒一定数据量再发送用TCP_CORK
.option(ChannelOption.TCP_NODELAY, true)
//保持连接,2小时内无数据通信TCP发送探测报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//允许重复使用本地地址和端口
.childOption(ChannelOption.SO_REUSEADDR, true)
//一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,
// 连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,
// 事件为ChannelInputShutdownEvent
.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)
//处理新连接
.childHandler(new ChildChannelHandler());
//异步地绑定服务器;
// 调用sync()方法阻塞等待直到绑定完成
future = server.bind(8090).sync();
logger.info("服务器启动开始监听端口: 8090");
//获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅退出,释放线程资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
2.ChildChannelHandler
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
protected void initChannel(SocketChannel socketChannel) throws Exception {
/* LineBasedFrameDecoder的工作原理是:依次遍历ByteBuf中的可读字节,
判断看其是否有”\n” 或 “\r\n”, 如果有就以此位置为结束位置。
从可读索引到结束位置的区间的字节就组成了一行。 它是以换行符为结束标志的解码器,
支持携带结束符和不带结束符两种解码方式,同时支持配置单行的最大长度,
如果读到了最大长度之后仍然没有发现换行符,则抛出异常,同时忽略掉之前读到的异常码流。*/
//socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new MyDecoder());//解码
socketChannel.pipeline().addLast(new StringEncoder());//编码
//设定 IdleStateHandler 心跳检测每五秒进行一次读检测,如果五秒内 ChannelRead()方法未被调用则触发一次 userEventTrigger()
//socketChannel.pipeline().addLast(new IdleStateHandler(60,60,60, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(group,new DiscardServerHandler());//客户端数据处理
}
}
3.DiscardServerHandler
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
protected Logger logger = LoggerFactory.getLogger(getClass());
private Map<ScheduledFuture<String>,ChannelHandlerContext> scheduledFutureList=new HashMap<>();
private static ChannelGroup channels =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static ChannelGroup getChannels() {
return channels;
}
public static void setChannels(ChannelGroup channels) {
DiscardServerHandler.channels = channels;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info(ctx.channel().remoteAddress()+"通道已注册");
channels.add(ctx.channel());
}
@Override
public void channelActive( ChannelHandlerContext ctx) throws Exception {
logger.info(ctx.channel().remoteAddress()+"通道已激活");
byte[] bytes= ConvertCode.hexString2Bytes( "0000000000");
ByteBuf message;
message=Unpooled.buffer(bytes.length);
message.writeBytes(bytes);
ctx.channel().writeAndFlush(message);
//心跳 非耗时任务
byte[] sendMsg=ConvertCode.hexString2Bytes("31");
ScheduledFuture<String> f= (ScheduledFuture<String>) ctx.channel().eventLoop().scheduleAtFixedRate(
new Runnable() {
public void run() {
byte[] bytes = ConvertCode.hexString2Bytes("31");
ByteBuf message;
message = Unpooled.buffer(bytes.length);
message.writeBytes(bytes);
ctx.channel().writeAndFlush(message);
}
}, 20, 20, TimeUnit.SECONDS);
scheduledFutureList.put(f,ctx);
//定时任务属于耗时任务
ctx.executor().execute(new Runnable() {
@Override
public void run() {
DiscardServerHandler.this.bussinessHandel(ctx,30);
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
byte[] msg1=(byte[])msg;
logger.info(ConvertCode.bytes2HexString(msg1));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info(ctx.channel().remoteAddress()+"已断开连接");
if (scheduledFutureList.size()>0)
{
for (Map.Entry<ScheduledFuture<String>,ChannelHandlerContext> entry : scheduledFutureList.entrySet()) {
if (entry.getValue().equals(ctx))
{
entry.getKey().cancel(false);
scheduledFutureList.remove(entry);
if (scheduledFutureList.size()==0) {
break;
}
}
}
}
ctx.channel();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出现异常就关闭
cause.printStackTrace();
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent){
IdleState event = ((IdleStateEvent) evt).state();
if (event==IdleState.READER_IDLE){
ctx.disconnect();
logger.info("心跳检测触发,socket连接断开!");
}
} else {
super.userEventTriggered(ctx,evt);
}
}
private void bussinessHandel(ChannelHandlerContext ctx,int rate)
{
final Channel channel = ctx.channel();
ScheduledFuture<String> future = (ScheduledFuture<String>) channel.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
byte[] bytes=ConvertCode.hexString2Bytes("11111111");
ByteBuf message;
message=Unpooled.buffer(bytes.length);
message.writeBytes(bytes);
channel.writeAndFlush(message);
}
}, rate, rate, TimeUnit.SECONDS);
scheduledFutureList.put(future,ctx);
}
}
4.websocket端AppServiceClientHandle
@Component
@ChannelHandler.Sharable
public class AppServiceClientHandle extends SimpleChannelInboundHandler<Object>{
protected Logger logger = LoggerFactory.getLogger(getClass());
//由于继承了SimpleChannelInboundHandler,这个方法必须实现,否则报错
//但实际应用中,这个方法没被调用
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buff = (ByteBuf) msg;
String info = buff.toString(CharsetUtil.UTF_8);
logger.info("收到消息内容:"+info);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
if (msg instanceof FullHttpRequest){
//以http请求形式接入,但是走的是websocket
FullHttpRequest fullHttpRequest=(FullHttpRequest)msg;
if (fullHttpRequest.getMethod() == HttpMethod.GET) {
//处理get方式http请求
}
}
else if(msg instanceof WebSocketFrame){
//处理websocket客户端的消息
logger.info("收客户端消息");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = reAddr.getAddress().getHostAddress();
String clientPort = String.valueOf(reAddr.getPort());
logger.info("连接断开:"+ clientIP +":"+ clientPort);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private void response(ChannelHandlerContext ctx, Content c) {
try {
// 1.设置响应
FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(JSONObject.toJSONString(c), CharsetUtil.UTF_8));
resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
// 2.发送
// 注意必须在使用完之后,close channel
if (ctx.channel().isActive()) {
logger.info("发送响应");
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
}
}catch (Exception e)
{
e.printStackTrace();
}
}
}
5.PipelineAdd
@Component
public class PipelineAdd {
public void websocketAdd(ChannelHandlerContext ctx){
// HttpServerCodec:将请求和应答消息解码为HTTP消息
ctx.pipeline().addBefore("commonhandler","http-codec",new HttpServerCodec());
// HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
ctx.pipeline().addBefore("commonhandler","aggregator",new HttpObjectAggregator(65535));
// ChunkedWriteHandler:向客户端发送HTML5文件,文件过大会将内存撑爆
ctx.pipeline().addBefore("commonhandler","http-chunked",new ChunkedWriteHandler());
ctx.pipeline().addBefore("commonhandler","WebSocketAggregator",new WebSocketFrameAggregator(65535));
//用于处理websocket, /ws为访问websocket时的uri
ctx.pipeline().addBefore("commonhandler","ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
}
}
6.SpringContextUtil@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @Description: 获取spring容器中的bean, 通过bean类型获取
*/
public static <T> T getBean(Class<T> beanClass) {
return applicationContext.getBean(beanClass);
}
}
7.MyDecoder解码器
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
byte[] b = new byte[in.readableBytes()];
//复制内容到字节数组b
in.readBytes(b);
out.add(b);
}
8.socket还是websocket选择器SocketChooseHandler**
* 协议初始化解码器. *
* 用来判定实际使用什么协议.</b> *
*/
@Component
public class SocketChooseHandler extends ByteToMessageDecoder {
/** 默认暗号长度为23 */
private static final int MAX_LENGTH = 23;
/** WebSocket握手的协议前缀 */
private static final String WEBSOCKET_PREFIX = "GET /";
@Resource
private SpringContextUtil springContextUtil;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String protocol = getBufStart(in);
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
springContextUtil.getBean(PipelineAdd.class).websocketAdd(ctx);
ctx.pipeline().remove(DiscardServerHandler.class);
ctx.pipeline().remove(MyDecoder.class);
ctx.pipeline().remove(StringEncoder.class);
}
in.resetReaderIndex();
ctx.pipeline().remove(this.getClass());
}
private String getBufStart(ByteBuf in){
int length = in.readableBytes();
if (length > MAX_LENGTH) {
length = MAX_LENGTH;
}
// 标记读位置
in.markReaderIndex();
byte[] content = new byte[length];
in.readBytes(content);
return new String(content);
}
}9.ConvertCode是String和byte[]等类型转换辅助类可自行解决
以上该内容进攻自己记录和参考,希望对您有所帮助。
版权声明:本文为u011001843原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。