后面考虑通过netty做一个真正意义的简约版RPC框架,今天先尝试通过正常调用逻辑调用netty构建的nio服务端并同步获得返回信息。为后面做铺垫
服务端实现
我们先完成服务端的逻辑,逻辑很简单,把客户端请求的内容加上服务器时间戳一并返回
public void run() throwsInterruptedException {
EventLoopGroup bossGroup= new NioEventLoopGroup(1); EventLoopGroup workGroup = newNioEventLoopGroup();try{
ServerBootstrap serverBootstrap= newServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,4096)
.childHandler(new ChannelInitializer() {
@Overrideprotected void initChannel(SocketChannel ch) throwsException {
ch.pipeline().addLast(newLineBasedFrameDecoder(Integer.MAX_VALUE));
ch.pipeline().addLast(newStringDecoder());
ch.pipeline().addLast(newHandler());
}
});
System.out.println("服务启动"+port);
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}finally{
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
String data=(String) msg;
System.out.println("服务端接收数据:" +data);
data= data + " now:"+System.currentTimeMillis();
ctx.writeAndFlush(Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
}
服务端用了LineBasedFrameDecoder,以防止半包读写问题,客户端需要进行配合
客户端实现
这个案例客户端实现有两个难点:
1、客户端方法如何能请求到SimpleChannelInboundHandler,即把需要发送的消息传到handler中
2、如何能等待服务端响应同步返回
第一个问题其实是如何把客户端输入的参数传入handler,所以我们需要把参数以构造函数传参的方式以此传入ChannelInitializer、SimpleChannelInboundHandler,这样handler就可以拿到客户端输入的数据了
下面的Controller里需要提前把channel准备好,如果RPC框架需要考虑通道与服务的关系
@RestControllerpublic classController {static String ip = "127.0.0.1";static int port = 9876;staticBootstrap bootstrap;staticNioEventLoopGroup worker;static{
bootstrap= newBootstrap();
worker= newNioEventLoopGroup();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel .class)
.option(ChannelOption.TCP_NODELAY,true)
.remoteAddress(newInetSocketAddress(ip, port));
}
@GetMapping("/nio/netty/server")publicString test(String param){
CustomerChannelInitializer customerChannelInitializer= newCustomerChannelInitializer(param);
bootstrap.handler(customerChannelInitializer);
ChannelFuture channelFuture= null;
String msg= "";try{
channelFuture=bootstrap.connect().sync();
}catch(InterruptedException e) {
e.printStackTrace();
}returncustomerChannelInitializer.getResponse();
}
}
public class CustomerChannelInitializer extends ChannelInitializer{privateString response;privateCountDownLatch countDownLatch;privateString param;privateClientChannelHandlerAdapter clientChannelHandlerAdapter;publicCustomerChannelInitializer(String param) {this.param =param;
}
@Overrideprotected void initChannel(SocketChannel ch) throwsException {
countDownLatch= new CountDownLatch(1);
clientChannelHandlerAdapter= new ClientChannelHandlerAdapter(param, this);
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(clientChannelHandlerAdapter);
}publicString getResponse() {try{
countDownLatch.await();
}catch(InterruptedException e) {
e.printStackTrace();
}returnresponse;
}public voidsetResponse(String response) {this.response =response;
countDownLatch.countDown();
}
}
public class ClientChannelHandlerAdapter extends SimpleChannelInboundHandler{privateString param;privateCustomerChannelInitializer customerChannelInitializer;publicClientChannelHandlerAdapter(String param, CustomerChannelInitializer customerChannelInitializer) {this.param =param;this.customerChannelInitializer =customerChannelInitializer;
}
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throwsException {
System.out.println("客户端收到返回数据:" +msg.toString(CharsetUtil.UTF_8));
customerChannelInitializer.setResponse(msg.toString(CharsetUtil.UTF_8));
}
@Overridepublic void channelActive(ChannelHandlerContext ctx) throwsException {
System.out.println("客户端准备发送数据");
ctx.writeAndFlush(Unpooled.copiedBuffer(param+ System.getProperty("line.separator"), CharsetUtil.UTF_8));
System.out.println("客户端发送数据完成");
}
@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
System.out.println("发生异常");
cause.printStackTrace();
ctx.close();
}
}
我们来看第二个问题,由于netty是异步的,所以无法等待到服务端响应后调用客户端的channelRead0方法,controller就已经返回了,导致了网页显示的返回结果一直是空
主线程通过CountDownLatch来锁住没有返回结果的线程,直到工作线程获得结果并解锁
@Overrideprotected void initChannel(SocketChannel ch) throwsException {
countDownLatch= new CountDownLatch(1);
……publicString getResponse() {try{
countDownLatch.await();
}catch(InterruptedException e) {
e.printStackTrace();
}returnresponse;
}public voidsetResponse(String response) {this.response =response;
countDownLatch.countDown();
}