netty简单聊天室实战

public class ChatServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup=new NioEventLoopGroup(1);//处理连接请求
        EventLoopGroup workerGroup=new NioEventLoopGroup();//默认线程数量为cpu核数的两倍,处理业务
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();//创建服务器端的启动对象
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) {
                            ChannelPipeline channelPipeline=socketChannel.pipeline();
                            channelPipeline.addLast("decoder",new StringDecoder());//加解码器
                            channelPipeline.addLast("encoder",new StringEncoder());
                            channelPipeline.addLast(new CharServerHandler());
                        }
                    });
            System.out.println("netty server start");
            //启动服务器绑定端口,bind是异步操作,sync是等待
            ChannelFuture cf=bootstrap.bind(9000).sync();

            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
public class CharServerHandler extends SimpleChannelInboundHandler<String> {


    //GlobalEventExecutor.INSTANCE:单例全局的一个事件执行器
    private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 当客户端连接到服务端是触发
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel ch=ctx.channel();
        //将有新客户端连接的消息发送给其他在线客户端
        channelGroup.writeAndFlush(sdf.format(new Date())+" client:"+ch.remoteAddress()+"  上线了"+"\n");
        channelGroup.add(ch);//加入新客户端
        System.out.println("client:"+ch.remoteAddress()+"  上线了");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel ch=ctx.channel();
        channelGroup.writeAndFlush(sdf.format(new Date())+" client:"+ch.remoteAddress()+"  下线了"+"\n");
        channelGroup.remove(ch);
        System.out.println("client:"+ch.remoteAddress()+"  下线了");
        System.out.println("channelGroup size="+channelGroup.size());
    }

    /**
     * 读取客户端的数据
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel ch=channelHandlerContext.channel();
        for (Channel channel:channelGroup){
            if(channel!=ch){
                channel.writeAndFlush(sdf.format(new Date())+" client:"+ch.remoteAddress()+"  发送了消息:"+msg+"\n");
            }else{
                //回显自己发送的消息
                channel.writeAndFlush(sdf.format(new Date())+"【自己】发送了消息:"+msg+"\n");
            }
        }

    }

}
public class ChatClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) {
                            ChannelPipeline channelPipeline=socketChannel.pipeline();
                            channelPipeline.addLast("decoder",new StringDecoder());//加解码器
                            channelPipeline.addLast("encoder",new StringEncoder());
                            channelPipeline.addLast(new ChatClientHandler());
                        }
                    });
            //System.out.println("netty client start");
            //启动客户端连接服务器
            ChannelFuture cf =bootstrap.connect("127.0.0.1",9000).sync();
            Scanner sc=new Scanner(System.in);
            while (sc.hasNext()){
                cf.channel().writeAndFlush(sc.nextLine());//发送消息到服务器
            }
            //关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
public class ChatClientHandler  extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}


版权声明:本文为dingruibao原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。