一、服务端
public class EchoServer {
public static void main(String[] args) throws Exception{
// (1) 初始化用于Acceptor(接受消息)的主"线程池"以及用于I/O工作的从"线程池";
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理客户端连接的主线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理I/O的从线程池
try {
// (2) 初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口
ServerBootstrap boot = new ServerBootstrap();
// (3) 通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池"
boot.group(bossGroup, workerGroup);
// (4) 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel
boot.channel(NioServerSocketChannel.class);
// (5) 设置ServerSocketChannel的处理器
boot.handler(new LoggingHandler());
// (6) 配置ServerSocketChannel的选项
boot.option(ChannelOption.SO_BACKLOG, 128);
// (7) 配置子通道也就是SocketChannel的选项
boot.childOption(ChannelOption.SO_KEEPALIVE, true);
// (8) 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"
boot.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline .addLast(new EchoServerHandler());
}
});
// Bind and start to accept incoming connections.
// (9) 绑定并侦听某个端口
ChannelFuture f = boot.bind("127.0.0.1", 8089).sync(); // (9)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
}
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当从客户端接受到一条消息时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("Server received:"+in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
/**
* 最后一次调用channelRead()时触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/**
* 在处理过程中发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server is Active......");
}
}
二、客户端
public void EchoClient {
public static void main(String[] args) throws Exception{
// (1) 初始化用于连接及I/O工作的线程池
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// (2) 初始化Bootstrap实例, 此实例是netty客户端应用开发的入口
Bootstrap b = new Bootstrap();
// (3) 通过Bootstrap的group方法,设置(1)中初始化的"线程池"
b.group(workerGroup);
// (4) 指定通道channel的类型,由于是客户端,故而是NioSocketChannel
b.channel(NioSocketChannel.class);
// (5) 设置SocketChannel的选项
b.option(ChannelOption.SO_KEEPALIVE, true);
// (6) 设置SocketChannel的处理器, 其内部是实际业务开发的"主战场"
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
// Start the client.
// (7) 连接指定的服务地址
ChannelFuture f = b.connect("127.0.0.1", 8089).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully().sync();
}
}
}
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 与服务器建立连接成功时被调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client is active......");
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
/**
* 当从服务器接受到一条消息时调用
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
System.out.println("client received:"+ in.toString(CharsetUtil.UTF_8));
}
/**
* 在处理过程中发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
版权声明:本文为qq_27870421原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。