public class ChatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup=new NioEventLoopGroup(1);//处理连接请求
EventLoopGroup workerGroup=new NioEventLoopGroup();//默认线程数量为cpu核数的两倍,处理业务
try {
ServerBootstrap bootstrap=new ServerBootstrap();//创建服务器端的启动对象
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline channelPipeline=socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());//加解码器
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new CharServerHandler());
}
});
System.out.println("netty server start");
//启动服务器绑定端口,bind是异步操作,sync是等待
ChannelFuture cf=bootstrap.bind(9000).sync();
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}public class CharServerHandler extends SimpleChannelInboundHandler<String> {
//GlobalEventExecutor.INSTANCE:单例全局的一个事件执行器
private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 当客户端连接到服务端是触发
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel ch=ctx.channel();
//将有新客户端连接的消息发送给其他在线客户端
channelGroup.writeAndFlush(sdf.format(new Date())+" client:"+ch.remoteAddress()+" 上线了"+"\n");
channelGroup.add(ch);//加入新客户端
System.out.println("client:"+ch.remoteAddress()+" 上线了");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel ch=ctx.channel();
channelGroup.writeAndFlush(sdf.format(new Date())+" client:"+ch.remoteAddress()+" 下线了"+"\n");
channelGroup.remove(ch);
System.out.println("client:"+ch.remoteAddress()+" 下线了");
System.out.println("channelGroup size="+channelGroup.size());
}
/**
* 读取客户端的数据
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
Channel ch=channelHandlerContext.channel();
for (Channel channel:channelGroup){
if(channel!=ch){
channel.writeAndFlush(sdf.format(new Date())+" client:"+ch.remoteAddress()+" 发送了消息:"+msg+"\n");
}else{
//回显自己发送的消息
channel.writeAndFlush(sdf.format(new Date())+"【自己】发送了消息:"+msg+"\n");
}
}
}
}public class ChatClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group=new NioEventLoopGroup();
try {
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline channelPipeline=socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());//加解码器
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new ChatClientHandler());
}
});
//System.out.println("netty client start");
//启动客户端连接服务器
ChannelFuture cf =bootstrap.connect("127.0.0.1",9000).sync();
Scanner sc=new Scanner(System.in);
while (sc.hasNext()){
cf.channel().writeAndFlush(sc.nextLine());//发送消息到服务器
}
//关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim());
}
}版权声明:本文为dingruibao原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。