netty-实现同步socket示例

1、说明

项目需要,需要实现一个socket同步请求的客户端,恰逢之前研究过netty,虽然netty的线程模型本身不适合同步、阻塞通信用,但是也懒得写OIO了,就用它直接实现了一下,有需要的同学可以使用,有什么问题还请留言指正。

本示例并未涉及重连、心跳检测、客户端连接池等实现。。。。

2、代码

封装的被应用层调用的工具类

package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;

/**
 * @Auther: mazhongjia
 * @Date: 2021/4/1 16:13
 * @Version: 1.0
 */
public class GraphDBRelevanceUtil {

    private static final NettyClient CLIENT = new NettyClient();

    static {
        CLIENT.start();

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                CLIENT.close();
            }
        }));
    }

    public static String graphDBRelevance(String dataJson) {
        return CLIENT.sendAndReceive(dataJson);
    }

    public static void main(String[] args) {
        String data = "{\n" +
                "\t\"tableName\": \"powertransformer\",\n" +
                "\t\"stId\": \"123\",\n" +
                "\t\"vlId\": \"123\",\n" +
                "\t\"coding\": \"1\",\n" +
                "\t\"naming\": \"测试\",\n" +
                "\t\"extend\": {\n" +
                "\t\t\"midvlId\": \"123\",\n" +
                "\t\t\"lowvlId\": \"123\"\n" +
                "\t}\n" +
                "}";

        for (int i=0;i<100;i++){
            System.out.println(graphDBRelevance(data));
        }

        System.exit(0);
    }
}

netty客户端

package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;

/**
 * @Auther: mazhongjia
 * @Date: 2021/4/1 14:17
 * @Version: 1.0
 */

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.Attribute;

import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class NettyClient {

    /**
     * 最长等待时间
     */
    private static final int MAX_AWAIT_TIME = 60;

    private final EventLoopGroup group = new NioEventLoopGroup(1);
    private final Semaphore semaphore = new Semaphore(1);

    private GraphDBRelevanceHandler handler = new GraphDBRelevanceHandler();

    private Channel channel;

    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
                        socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
                        socketChannel.pipeline().addLast(handler);
                    }
                });

        ChannelFuture future = null;
        try {
            future = bootstrap.connect("localhost", 8081).sync();

            //初始化通道属性
            Attribute<Map<String, Object>> attribute = future.channel().attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
            ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
            attribute.set(dataMap);
        } catch (InterruptedException e) {
            System.out.println("【客户端】连接失败....");
            e.printStackTrace();
            return;
        }
        System.out.println("【客户端】连接成功....");
        channel = future.channel();
    }

    public void close() {
        System.out.println("【客户端】资源释放....");
        group.shutdownGracefully();
    }

    public String sendAndReceive(String jsonData) {

        try {
            semaphore.acquire();
            //1、构造请求的同步返回结果callbackService
            final CallbackService callbackService = new CallbackService();
            ChannelUtils.putCallback2DataMap(channel, callbackService);

            //2、构造发送对象
            ByteBuf sendData = buildSendData(jsonData);

            channel.writeAndFlush(sendData);

            //3、同步等待
            callbackService.awaitThread(MAX_AWAIT_TIME, TimeUnit.SECONDS);

            //4、获取服务端返回结果
            Object data = callbackService.getData();

            return data.toString();

        } catch (InterruptedException e) {
            System.out.println("【客户端】发送关联报文被中断....");
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
        return null;
    }

    private ByteBuf buildSendData(String jsonData) throws UnsupportedEncodingException {

        byte[] dataByte = jsonData.getBytes("utf-8");

        ByteBuf message = Unpooled.buffer(dataByte.length);
        message.writeInt(dataByte.length);
        message.writeBytes(dataByte);

        return message;
    }
}

业务handler

package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;

/**
 * @Auther: mazhongjia
 * @Date: 2021/4/1 14:18
 * @Version: 1.0
 */
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GraphDBRelevanceHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("【客户端】连接成功....");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String body) throws Exception {

        System.out.println("【客户端】收到一次服务端返回结果:Now is : "+ body );

        CallbackService callbackService = ChannelUtils.<CallbackService>removeCallback(ctx.channel());
        callbackService.receiveMessage(body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("【客户端】连接【服务端】通道遇到未知异常,关闭当前Channel....");
        cause.printStackTrace();
        ctx.close();
    }




}

请求响应异步结果回调使用

package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

import java.util.Map;

/**
 * netty的通道、请求与响应结果映射工具
 *
 * @Auther: mazhongjia
 * @Date: 2021/4/1 17:13
 * @Version: 1.0
 */
public class ChannelUtils {

    public static final AttributeKey<Map<String, Object>> DATA_MAP_ATTRIBUTEKEY = AttributeKey.valueOf("dataMap");

    public static final String KEY = "key";

    public static <T> void putCallback2DataMap(Channel channel, T callback) {
        channel.attr(DATA_MAP_ATTRIBUTEKEY).get().put(KEY, callback);
    }

    public static <T> T removeCallback(Channel channel) {
        return (T) channel.attr(DATA_MAP_ATTRIBUTEKEY).get().remove(KEY);
    }
}

同步控制器

package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 同步执行控制
 *
 * @Auther: mazhongjia
 * @Date: 2021/4/1 16:13
 * @Version: 1.0
 */
public class CallbackService {

    /**
     * 同步控制
     */
    private final CountDownLatch latch = new CountDownLatch(1);

    /**
     * 返回的结果对象(类型后期开发时修改,暂定为Object)
     */
    private volatile Object data;

    public void awaitThread(int timeout, TimeUnit unit) throws InterruptedException {
        latch.await(timeout,unit);
    }

    public void receiveMessage(Object data) throws Exception {
        this.data = data;
        //释放调用线程
        latch.countDown();
    }

    public Object getData() {
        return data;
    }
}

 


版权声明:本文为mazhongjia原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。