想理解一个架构的底层原理,必须先从最基础的实现开始
服务器端
public class NettyChatServer {
//端口号
private int port;
public NettyChatServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
//1. 创建bossGroup线程组: 处理网络事件--连接事件
EventLoopGroup bossGroup = null;
//2. 创建workerGroup线程组: 处理网络事件--读写事件 默认2*处理器线程数
EventLoopGroup workerGroup = null;
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//3. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//4. 设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
.option(ChannelOption.SO_BACKLOG, 128)// 用于临时存放已完成三次握手的请求的队列的最大长度
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)// 启用心跳保活机制
.childHandler(new ChannelInitializer<SocketChannel>() { //7. 创建一个通道初始化对象 也可改造为WebSocketChanneInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//8. 向pipeline中添加自定义业务处理handler
//添加编解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
// todo
ch.pipeline().addLast(new NettyChatServerHandler());
}
});
//9. 启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
System.out.println("聊天室服务端启动成功.");
future.channel().closeFuture().sync();
} finally {
//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyChatServer(9998).run();
}
}
NettyChatServerHandler 消息处理类
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {
public static List<Channel> channelList = new ArrayList<Channel>();
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//当有新的客户端连接的时候, 将通道放入集合
channelList.add(channel);
System.out.println("[Server]:" +
channel.remoteAddress().toString().substring(1) + "在线.");
}
/**
* 通道未就绪--channel下线
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//当有客户端断开连接的时候,就移除对应的通道
channelList.remove(channel);
System.out.println("[Server]:" +
channel.remoteAddress().toString().substring(1) + "下线.");
}
/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//当前发送消息的通道, 当前发送的客户端连接
Channel channel = ctx.channel();
for (Channel channel1 : channelList) {
//排除自身通道
if (channel != channel1) {
channel1.writeAndFlush("[" + channel.remoteAddress().toString().substring(1)
+ "]说:" + msg);
}
}
}
/**
* 异常处理事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
//移除集合
channelList.remove(channel);
System.out.println("[Server]:" +
channel.remoteAddress().toString().substring(1) + "异常.");
}
}
客户端
public class NettyClient {
private String ip;//服务端IP
private int port;//服务端端口号
public NettyClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
private void run() throws InterruptedException {
//创建线程组
EventLoopGroup group=new NioEventLoopGroup();//使用NioEventLoopGroup
//创建客户端启动助手
Bootstrap bootstrap =new Bootstrap();
//设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)//设置客户端通道为NIO
.handler(new ChannelInitializer<SocketChannel>() {//创建初始化对象
//冲在初始化方法
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//向管道添加handler
socketChannel.pipeline().addLast(new StringDecoder());//string编码器
socketChannel.pipeline().addLast(new StringEncoder());//string解码器
//添加自定义处理类
socketChannel.pipeline().addLast(new NettyChatClientHandler());
}
});
// 启动客户端,等待连接服务端,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress().toString().substring(1) + "--------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//向服务端发送消息
channel.writeAndFlush( msg);
}
// 关闭通道和关闭连接池
channelFuture.channel().closeFuture().sync();
}
public static void main(String[] args) throws InterruptedException {
new NettyClient("127.0.0.1", 9998).run();
}
}
消息处理类
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 读通道就绪事件
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
版权声明:本文为u010215318原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。