1、说明
项目需要,需要实现一个socket同步请求的客户端,恰逢之前研究过netty,虽然netty的线程模型本身不适合同步、阻塞通信用,但是也懒得写OIO了,就用它直接实现了一下,有需要的同学可以使用,有什么问题还请留言指正。
本示例并未涉及重连、心跳检测、客户端连接池等实现。。。。
2、代码
封装的被应用层调用的工具类
package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;
/**
* @Auther: mazhongjia
* @Date: 2021/4/1 16:13
* @Version: 1.0
*/
public class GraphDBRelevanceUtil {
private static final NettyClient CLIENT = new NettyClient();
static {
CLIENT.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
CLIENT.close();
}
}));
}
public static String graphDBRelevance(String dataJson) {
return CLIENT.sendAndReceive(dataJson);
}
public static void main(String[] args) {
String data = "{\n" +
"\t\"tableName\": \"powertransformer\",\n" +
"\t\"stId\": \"123\",\n" +
"\t\"vlId\": \"123\",\n" +
"\t\"coding\": \"1\",\n" +
"\t\"naming\": \"测试\",\n" +
"\t\"extend\": {\n" +
"\t\t\"midvlId\": \"123\",\n" +
"\t\t\"lowvlId\": \"123\"\n" +
"\t}\n" +
"}";
for (int i=0;i<100;i++){
System.out.println(graphDBRelevance(data));
}
System.exit(0);
}
}
netty客户端
package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;
/**
* @Auther: mazhongjia
* @Date: 2021/4/1 14:17
* @Version: 1.0
*/
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.Attribute;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class NettyClient {
/**
* 最长等待时间
*/
private static final int MAX_AWAIT_TIME = 60;
private final EventLoopGroup group = new NioEventLoopGroup(1);
private final Semaphore semaphore = new Semaphore(1);
private GraphDBRelevanceHandler handler = new GraphDBRelevanceHandler();
private Channel channel;
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
socketChannel.pipeline().addLast(handler);
}
});
ChannelFuture future = null;
try {
future = bootstrap.connect("localhost", 8081).sync();
//初始化通道属性
Attribute<Map<String, Object>> attribute = future.channel().attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
attribute.set(dataMap);
} catch (InterruptedException e) {
System.out.println("【客户端】连接失败....");
e.printStackTrace();
return;
}
System.out.println("【客户端】连接成功....");
channel = future.channel();
}
public void close() {
System.out.println("【客户端】资源释放....");
group.shutdownGracefully();
}
public String sendAndReceive(String jsonData) {
try {
semaphore.acquire();
//1、构造请求的同步返回结果callbackService
final CallbackService callbackService = new CallbackService();
ChannelUtils.putCallback2DataMap(channel, callbackService);
//2、构造发送对象
ByteBuf sendData = buildSendData(jsonData);
channel.writeAndFlush(sendData);
//3、同步等待
callbackService.awaitThread(MAX_AWAIT_TIME, TimeUnit.SECONDS);
//4、获取服务端返回结果
Object data = callbackService.getData();
return data.toString();
} catch (InterruptedException e) {
System.out.println("【客户端】发送关联报文被中断....");
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
return null;
}
private ByteBuf buildSendData(String jsonData) throws UnsupportedEncodingException {
byte[] dataByte = jsonData.getBytes("utf-8");
ByteBuf message = Unpooled.buffer(dataByte.length);
message.writeInt(dataByte.length);
message.writeBytes(dataByte);
return message;
}
}
业务handler
package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;
/**
* @Auther: mazhongjia
* @Date: 2021/4/1 14:18
* @Version: 1.0
*/
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GraphDBRelevanceHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【客户端】连接成功....");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String body) throws Exception {
System.out.println("【客户端】收到一次服务端返回结果:Now is : "+ body );
CallbackService callbackService = ChannelUtils.<CallbackService>removeCallback(ctx.channel());
callbackService.receiveMessage(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【客户端】连接【服务端】通道遇到未知异常,关闭当前Channel....");
cause.printStackTrace();
ctx.close();
}
}
请求响应异步结果回调使用
package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import java.util.Map;
/**
* netty的通道、请求与响应结果映射工具
*
* @Auther: mazhongjia
* @Date: 2021/4/1 17:13
* @Version: 1.0
*/
public class ChannelUtils {
public static final AttributeKey<Map<String, Object>> DATA_MAP_ATTRIBUTEKEY = AttributeKey.valueOf("dataMap");
public static final String KEY = "key";
public static <T> void putCallback2DataMap(Channel channel, T callback) {
channel.attr(DATA_MAP_ATTRIBUTEKEY).get().put(KEY, callback);
}
public static <T> T removeCallback(Channel channel) {
return (T) channel.attr(DATA_MAP_ATTRIBUTEKEY).get().remove(KEY);
}
}
同步控制器
package com.mzj.netty.protocol.decoder._03_LengthFieldBasedFrameDecoder.json;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 同步执行控制
*
* @Auther: mazhongjia
* @Date: 2021/4/1 16:13
* @Version: 1.0
*/
public class CallbackService {
/**
* 同步控制
*/
private final CountDownLatch latch = new CountDownLatch(1);
/**
* 返回的结果对象(类型后期开发时修改,暂定为Object)
*/
private volatile Object data;
public void awaitThread(int timeout, TimeUnit unit) throws InterruptedException {
latch.await(timeout,unit);
}
public void receiveMessage(Object data) throws Exception {
this.data = data;
//释放调用线程
latch.countDown();
}
public Object getData() {
return data;
}
}
版权声明:本文为mazhongjia原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。