netty服务器怎么返回响应,客户端(springmvc)调用netty构建的nio服务端,获得响应后返回页面(同步响应)...

后面考虑通过netty做一个真正意义的简约版RPC框架,今天先尝试通过正常调用逻辑调用netty构建的nio服务端并同步获得返回信息。为后面做铺垫

服务端实现

我们先完成服务端的逻辑,逻辑很简单,把客户端请求的内容加上服务器时间戳一并返回

public void run() throwsInterruptedException {

EventLoopGroup bossGroup= new NioEventLoopGroup(1); EventLoopGroup workGroup = newNioEventLoopGroup();try{

ServerBootstrap serverBootstrap= newServerBootstrap();

serverBootstrap.group(bossGroup,workGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG,4096)

.childHandler(new ChannelInitializer() {

@Overrideprotected void initChannel(SocketChannel ch) throwsException {

ch.pipeline().addLast(newLineBasedFrameDecoder(Integer.MAX_VALUE));

ch.pipeline().addLast(newStringDecoder());

ch.pipeline().addLast(newHandler());

}

});

System.out.println("服务启动"+port);

ChannelFuture channelFuture=serverBootstrap.bind(port).sync();

channelFuture.channel().closeFuture().sync();

}finally{

workGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {

String data=(String) msg;

System.out.println("服务端接收数据:" +data);

data= data + " now:"+System.currentTimeMillis();

ctx.writeAndFlush(Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));

}

服务端用了LineBasedFrameDecoder,以防止半包读写问题,客户端需要进行配合

客户端实现

这个案例客户端实现有两个难点:

1、客户端方法如何能请求到SimpleChannelInboundHandler,即把需要发送的消息传到handler中

2、如何能等待服务端响应同步返回

第一个问题其实是如何把客户端输入的参数传入handler,所以我们需要把参数以构造函数传参的方式以此传入ChannelInitializer、SimpleChannelInboundHandler,这样handler就可以拿到客户端输入的数据了

下面的Controller里需要提前把channel准备好,如果RPC框架需要考虑通道与服务的关系

@RestControllerpublic classController {static String ip = "127.0.0.1";static int port = 9876;staticBootstrap bootstrap;staticNioEventLoopGroup worker;static{

bootstrap= newBootstrap();

worker= newNioEventLoopGroup();

bootstrap.group(worker);

bootstrap.channel(NioSocketChannel .class)

.option(ChannelOption.TCP_NODELAY,true)

.remoteAddress(newInetSocketAddress(ip, port));

}

@GetMapping("/nio/netty/server")publicString test(String param){

CustomerChannelInitializer customerChannelInitializer= newCustomerChannelInitializer(param);

bootstrap.handler(customerChannelInitializer);

ChannelFuture channelFuture= null;

String msg= "";try{

channelFuture=bootstrap.connect().sync();

}catch(InterruptedException e) {

e.printStackTrace();

}returncustomerChannelInitializer.getResponse();

}

}

public class CustomerChannelInitializer extends ChannelInitializer{privateString response;privateCountDownLatch countDownLatch;privateString param;privateClientChannelHandlerAdapter clientChannelHandlerAdapter;publicCustomerChannelInitializer(String param) {this.param =param;

}

@Overrideprotected void initChannel(SocketChannel ch) throwsException {

countDownLatch= new CountDownLatch(1);

clientChannelHandlerAdapter= new ClientChannelHandlerAdapter(param, this);

ChannelPipeline pipeline=ch.pipeline();

pipeline.addLast(clientChannelHandlerAdapter);

}publicString getResponse() {try{

countDownLatch.await();

}catch(InterruptedException e) {

e.printStackTrace();

}returnresponse;

}public voidsetResponse(String response) {this.response =response;

countDownLatch.countDown();

}

}

public class ClientChannelHandlerAdapter extends SimpleChannelInboundHandler{privateString param;privateCustomerChannelInitializer customerChannelInitializer;publicClientChannelHandlerAdapter(String param, CustomerChannelInitializer customerChannelInitializer) {this.param =param;this.customerChannelInitializer =customerChannelInitializer;

}

@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throwsException {

System.out.println("客户端收到返回数据:" +msg.toString(CharsetUtil.UTF_8));

customerChannelInitializer.setResponse(msg.toString(CharsetUtil.UTF_8));

}

@Overridepublic void channelActive(ChannelHandlerContext ctx) throwsException {

System.out.println("客户端准备发送数据");

ctx.writeAndFlush(Unpooled.copiedBuffer(param+ System.getProperty("line.separator"), CharsetUtil.UTF_8));

System.out.println("客户端发送数据完成");

}

@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {

System.out.println("发生异常");

cause.printStackTrace();

ctx.close();

}

}

我们来看第二个问题,由于netty是异步的,所以无法等待到服务端响应后调用客户端的channelRead0方法,controller就已经返回了,导致了网页显示的返回结果一直是空

主线程通过CountDownLatch来锁住没有返回结果的线程,直到工作线程获得结果并解锁

@Overrideprotected void initChannel(SocketChannel ch) throwsException {

countDownLatch= new CountDownLatch(1);

……publicString getResponse() {try{

countDownLatch.await();

}catch(InterruptedException e) {

e.printStackTrace();

}returnresponse;

}public voidsetResponse(String response) {this.response =response;

countDownLatch.countDown();

}