基于netty的可群聊聊天室

一.环境配置

<dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>

 netty是本项目的基础

lombok用于简化开发

gson用于序列化Json

logback用于提供日志

二.包结构

其中message用于定义服务端和客户端的消息种类

protocol用于定义双方之间的通信协议

config用于从配置文件中读取配置

server包下,handler包用于定义处理各类request消息的处理器

service提供UserService服务,用于登录校验

session提供会话服务,包括私聊和群聊

三.开发message块

首先抽象出一个Message父类

public abstract class Message implements Serializable {
    protected int messageType;
    protected int messageSequenceId;
    public abstract int getMessageT();
    public abstract int getMessageSequenceId();

    public static final int LoginRequest=1;//登录请求
    public static final int LoginResponse=2;//登录响应
    public static final int ChatRequest=3;//私聊请求
    public static final int ChatResponse=4;//私聊响应
    public static final int GroupChatRequestMessage=5;//群聊请求
    public static final int GroupChatResponseMessage=6;//群聊响应
    public static final int GroupCreatRequestMessage=7;//创建group请求
    public static final int GroupCreatResponseMessage=8;//创建group响应
    public static final int GroupDetailsRequestMessage=9;//group细节请求
    public static final int GroupDetailsResponseMessage=10;//group细节响应
    public static final int GroupJoinRequestMessage=11;//加入group请求
    public static final int GroupJoinResponseMessage=12;//加入group响应
    public static final int GroupQuitRequestMessage=13;//退出group请求
    public static final int GroupQuitResponseMessage=14;//退出group响应
    public static final int PingMessage=15;//客户端心跳消息,之后3s一次
    public static final int RpcRequestMessage=101;//rpc请求消息
    public static final int RpcResponseMessage=102;//rpc请求响应
    public static HashMap<Integer,Class<?>> MessageMap=new HashMap<>();


    static {
        MessageMap.put(1,LoginRequestMessage.class);
        MessageMap.put(2,LoginResponseMessage.class);
        MessageMap.put(3,ChatRequestMessage.class);
        MessageMap.put(4,ChatResponseMessage.class);
        MessageMap.put(5,GroupChatRequestMessage.class);
        MessageMap.put(6,GroupChatResponseMessage.class);
        MessageMap.put(7,GroupCreatRequestMessage.class);
        MessageMap.put(8,GroupCreatResponseMessage.class);
        MessageMap.put(9,GroupDetailsRequestMessage.class);
        MessageMap.put(10,GroupDetailsResponseMessage.class);
        MessageMap.put(11,GroupJoinRequestMessage.class);
        MessageMap.put(12,GroupJoinResponseMessage.class);
        MessageMap.put(13,GroupQuitRequestMessage.class);
        MessageMap.put(14,GroupQuitResponseMessage.class);
        MessageMap.put(15,PingMessage.class);
        MessageMap.put(101, com.ghc.message.RpcRequestMessage.class);
        MessageMap.put(102, RpcResponseMessage.class);
    }
}

主要有两个字段: messageType消息类型,用于区分消息,messageSequenceId,提供一个通信序列号。将消息类型和其对应的Class类存入map,便于后续Gson反序列化使用

实现响应的子类,rpc相关消息最后实现

 为每个消息添加独有的字段,为了简单,暂时不考虑序列号

/**
 * 聊天
 */
@Data
public class ChatRequestMessage extends Message{
    private String source;
    private String target;
    private String content;

    public ChatRequestMessage(String source, String target, String content) {
        this.source = source;
        this.target = target;
        this.content = content;
    }

    @Override
    public int getMessageT() {
        return Message.ChatRequest;
    }

    @Override
    public int getMessageSequenceId() {
        return 0;
    }
}

其他的消息也一样,加入自己特有的字段

四.开发序列化器

为了增强可拓展性,让序列化器可选择,这里定义一个Serializer接口,提供反序列方法和序列方法。

public interface Serializer {

    //反序列化方法
    <T> T deserialize(Class<T> clazz,byte[] bytes);

    //序列化方法
    <T> byte[] serialize(T object);
}

接着在其内部用枚举的方式定义JDK序列化器和Gson序列化器(注意要实现Serializer接口)

enum Algorithm implements Serializer{

        Java{
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                try {
                    return (T) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失败",e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
                    new ObjectOutputStream(bytes).writeObject(object);
                    return bytes.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失败",e);
                }
            }
        },
        Json{
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                String json = new String(bytes, StandardCharsets.UTF_8);
                return new Gson.fromJson(json,clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {
                String json = new Gson.toJson(object);
                return json.getBytes(StandardCharsets.UTF_8);
            }
        }

提供一个Config类,读取application.properties中的序列化方式

public class Config {
    static Properties properties;

    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static int getServerPort(){
        return Integer.parseInt(properties.getProperty("server.port","8888"));
    }

    public static Serializer.Algorithm getSerializerAlgorithm(){
        return Serializer.Algorithm.valueOf(properties.getProperty("serializer.algorithm","Java"));
    }
}

 之后可以在别的地方调用getSerializerAlgorithm()方法得到具体的序列化器

 五.开发通信协议

首先继承netty中的MessageToMessageCodec类,实现其方法,之后加入NIOchannel的pipeline即可.

encode是编码,将消息对象转为byteBuf

@ChannelHandler.Sharable
/**
 * 必须和LengthFieldBasedFrameDecoder一起使用,确保传过来的bytebuf是完整的,不需要存储状态
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List<Object> list) throws Exception {
        ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
        //1. 4字节的魔数
        byteBuf.writeBytes(new byte[]{7, 7, 7, 7});
        //2. 1字节的版本
        byteBuf.writeByte(1);
        //3. 用1字节代表序列化的方式  0代表jdk,1代表json
        //枚举对象的ordinal()返回在类中的顺序下标,和数字对应起来
        byteBuf.writeByte(Config.getSerializerAlgorithm().ordinal());
        //4. 用1字节 代表指令类型 ,比如登录,注册,发消息,聊天
        byteBuf.writeByte(message.getMessageT());
        //写入一个对齐字节,无意义
        byteBuf.writeByte(0xff);
        //5. 4字节的指令序号
        byteBuf.writeInt(message.getMessageSequenceId());
        //6. 获取内容字节数组
        byte[] bytes = Config.getSerializerAlgorithm().serialize(message);

        //7.长度
        byteBuf.writeInt(bytes.length);

        //8.写入内容
        byteBuf.writeBytes(bytes);
        list.add(byteBuf);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int magicNum = byteBuf.readInt();
        byte version = byteBuf.readByte();
        byte serializerAlgorithm = byteBuf.readByte();
        int messageType = byteBuf.readByte();
        byteBuf.readByte();
        int sequenceId = byteBuf.readInt();
        int length = byteBuf.readInt();
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes);
        //1.根据传过来的字节找到应该用的序列化算法
        Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];

        //2.如果底层使用Gson,则须指定消息的实际类型,而不能是大Message抽象类
        Message message = (Message) algorithm.deserialize(Message.MessageMap.get(messageType), bytes);

        //加入list,以便下一个handler拿到结果
        list.add(message);
    }
}

六.实现Service层,包括UserService和Session,GroupSession

这里全部使用工厂模式,即先定义一个接口,再提供一个工厂,再提供一个具体的实现,用的时候通过工厂拿到具体的实现

 UserService

public interface UserService {
    /**
     *
     * @param username 用户名
     * @param password 密码
     * @return 登陆成功返回true,失败返回false
     */
    boolean login(String username,String password);
}

 实现校验登录功能,可以在具体的实现中写死用户名和密码,存在map中,只进行校验,也可以实现注册功能,提供一个注册register方法.

Session,用于维护用户和他的Channel,提供登陆绑定和退出解绑,和通过用户名拿到channel的方法(注意这里的channel是netty自己的channel) 

import io.netty.channel.Channel;

/**
 * 会话管理接口,把用户名和channel的信息进行管理
 */
public interface Session {
    /**
     * 绑定会话
     * @param channel 哪个channel要绑定会话
     * @param username 会话绑定用户
     */
    void bind(Channel channel,String username);

    /**
     * 解绑会话
     * @param channel 哪个channel要解绑会话
     */
    void unBind(Channel channel);

    /**
     * 根据用户名返回他连接的channel
     * @param username 哪个用户
     * @return 连接的channel
     */
    Channel getChannel(String username);

    public String getUsername(Channel channel);
}

创建一个Group类,封装群名和成员信息

@Data
public class Group {
    String name;
    Set<String> members;

    public Group(String name, Set<String> members) {
        this.name = name;
        this.members = members;
    }
}

GroupSession

/**
 * 聊天室管理接口
 */
public interface GroupSession {
    /**
     * 创建组,如果存在,返回null,
     * @param name group 的名字
     * @param members group 的成员
     * @return 返回组
     */
    Group createGroup(String name, Set<String> members);

    /**
     *
     * @param name 组名
     * @param member 成员名
     * @return 如果组不存在,返回null
     */
    Group joinMember(String name,String member);

    /**
     *
     * @param name 组名
     * @param member 成员名
     * @return 组不存在返回null
     */
    Group removeMember(String name,String member);

    /**
     *删除组
     * @param name 组名
     * @return 组名不存在返回null
     */
    Group removeGroup(String name);

    /**
     * 获得成员
     * @param name 名字
     * @return 返回集合
     */
    Set<String> getMembers(String name);

    /**
     * 获取组对应的channel
     * @param name 组名
     * @return channel集合
     */
    List<Channel> getChannels(String name);
}

七.编写服务器处理各种request请求的handler

这里可以继承 SimpleChannelInboundHandler类,它可以指定接收的消息泛型,这样在pipeline中,其他类型的消息不会传过来

例:

@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {
        String source = msg.getSource();
        String target = msg.getTarget();
        Channel targetChannel = SessionFactory.getSession().getChannel(target);
        Channel sourceChannel = SessionFactory.getSession().getChannel(source);
        if(targetChannel!=null){
            targetChannel.writeAndFlush(ChatResponseMessage.success(source,target, msg.getContent()));
        }else {
            sourceChannel.writeAndFlush(ChatResponseMessage.fail("对方用户未上线或不存在,请重新发送信息"));
        }
    }
}

 注意:所有的handler都要加@Sharable注解,不然netty在pipeline中认为这是一个线程不共享的handler,只会用一次,下一次消息进来就不会经过这个handler了。

具体的业务逻辑不是很复杂,大家自行实现

八.编写服务器

由于handler是sharable共享的,所以只需要创建一份

LengthFieldBasedFrameDecoder 是基于长度的帧长解码器,用于解决半包粘包的问题,原理是   指定数据第几位是长度,然后解码器会等待接收的数据达到长度,一次性发送,接收的时候同理。

                  用来判断是不是读空闲时间过长 或者写时间过长
            //                            5s没有收到某个channel的数据,会触发相应地IdleState#READER_IDEL时间,需要自己处理
                            nioSocketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0));
                            //双向的handler,既入站又出站
                            nioSocketChannel.pipeline().addLast(new ChannelDuplexHandler() {
                                //用来触发用户产生的特殊事件
                                @Override
                                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                    IdleStateEvent event = (IdleStateEvent) evt;
                                    if (event.state() == IdleState.READER_IDLE) {
                                        log.debug("5s没有读到数据");
                                    }
                                }
                            });

这两个handler,第一个是netty提供的定时处理器,第一个参数为5指的是如果5s对同一个channel没有读时间,就会向后发出一个 IdleStateEvent的时间,

而第二个handler的用处就是专门接收IdleStateEvent时间,然后做出响应,这里5s没有接收到数据,做出的响应是打印日志,由于客户端3s发送一个PingMessage,5s没收到也可以直接认定该channel关闭了,解除用户的绑定。

@Slf4j
public class ChatServer {
    public static void main(String[] args) {
        //抽取出来,每个channel共用同一个
        //messageCodec也是无状态的,只对这次传过来的数据处理,因此可以抽取出来
        LoggingHandler loggingHandler = new LoggingHandler();
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        MessageCodecSharable messageCodec = new MessageCodecSharable();
        LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
        ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
        GroupCreatRequestMessageHandler groupCreatRequestMessageHandler = new GroupCreatRequestMessageHandler();
        GroupDetailsRequestMessageHandler groupDetailsRequestMessageHandler = new GroupDetailsRequestMessageHandler();
        GroupChatRequestMessageHandler groupChatRequestMessageHandler = new GroupChatRequestMessageHandler();
        GroupQuitRequestMessageHandler groupQuitRequestMessageHandler = new GroupQuitRequestMessageHandler();
        QuitHandler quitHandler = new QuitHandler();
        RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
        try {
            ChannelFuture channelFuther = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {

                            nioSocketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
                            nioSocketChannel.pipeline().addLast(loggingHandler);
                            nioSocketChannel.pipeline().addLast(messageCodec);
//                            用来判断是不是读空闲时间过长 或者写时间过长
//                            5s没有收到某个channel的数据,会触发相应地IdleState#READER_IDEL时间,需要自己处理
                            nioSocketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0));
                            //双向的handler,既入站又出站
                            nioSocketChannel.pipeline().addLast(new ChannelDuplexHandler() {
                                //用来触发用户产生的特殊事件
                                @Override
                                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                    IdleStateEvent event = (IdleStateEvent) evt;
                                    if (event.state() == IdleState.READER_IDLE) {
                                        System.out.println("5s没有读到数据");
                                    }
                                }
                            });
//                            利用SimpleChannelInboundHandler可以接收特定的消息类型,方便处理用户各种数据
                            nioSocketChannel.pipeline().addLast(loginRequestMessageHandler);
                            nioSocketChannel.pipeline().addLast(chatRequestMessageHandler);
                            nioSocketChannel.pipeline().addLast(groupCreatRequestMessageHandler);
                            nioSocketChannel.pipeline().addLast(groupDetailsRequestMessageHandler);
                            nioSocketChannel.pipeline().addLast(groupChatRequestMessageHandler);
                            nioSocketChannel.pipeline().addLast(groupQuitRequestMessageHandler);
                            nioSocketChannel.pipeline().addLast(quitHandler);
                            nioSocketChannel.pipeline().addLast(rpcRequestMessageHandler);
                        }
                    }).bind(8888).sync();
            channelFuther.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

九.编写客户端

public class ChatClient {
    /**
     * ?服务端的handler必须加sharable注解,不然不能启动多个客户端
     * @param args
     */

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        //用户输入线程和服务器响应nio线程间通信,每次响应触发用户输入,执行操作
        CountDownLatch signal = new CountDownLatch(1);
        AtomicBoolean LOGIN = new AtomicBoolean(true);
        try {
            new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
                    //nioSocketChannel.pipeline().addLast(new LoggingHandler());
                    nioSocketChannel.pipeline().addLast(new MessageCodecSharable());
                    //心跳,每3s向服务器发心跳数据包
                    //3s没有写事件,会触发相应地IdleState#WRITER_IDEL时间,然后向服务器发数据包
                    nioSocketChannel.pipeline().addLast(new IdleStateHandler(0, 3, 0));
                    //双向的handler,既入站又出站
                    nioSocketChannel.pipeline().addLast(new ChannelDuplexHandler() {
                        //用来触发用户产生的特殊事件
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                            IdleStateEvent event = (IdleStateEvent) evt;
                            if (event.state() == IdleState.WRITER_IDLE) {
                                //System.out.println("3s没有写数据,发送一个心跳包");
                                ctx.writeAndFlush(new PingMessage());
                            }
                        }
                    });
                    nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<LoginResponseMessage>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginResponseMessage loginResponseMessage) throws Exception {
                                System.out.println(loginResponseMessage.getReason());
                                if (!loginResponseMessage.isSuccess()) {
                                    channelHandlerContext.channel().close();
                                    LOGIN.set(false);
                                }
                                signal.countDown();
                            }
                    });
                }
            }).connect("localhost", 8888).sync().channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
            System.out.println("已退出...");
        }
    }

}

为了让NIO线程专注于接收服务端发来的响应数据,这里用异步的方式,在连接刚建立的时候,新建一个线程专门处理用户的输入IO功能,做法是加入一个handler,重写他的channelActive方法,新建一个线程

nioSocketChannel.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
                        //在连接建立后触发active 事件
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //在一个新线程中接收用户控制台的输入,账户名和密码
                            //尽量减少原生的线程的使用
                            new Thread(() -> {
                                Scanner scanner = new Scanner(System.in);
                                System.out.println("请输入用户名:");
                                String username = scanner.nextLine();
                                System.out.println("请输入密码:");
                                String password = scanner.nextLine();
                                LoginRequestMessage message = new LoginRequestMessage(username, password);
                                ctx.writeAndFlush(message);
                                System.out.println("等待后续操作");
                                try {
                                    signal.await();
                                    if (!LOGIN.get()) {
                                        ctx.channel().close();
                                        return;
                                    }
                                    while (true) {
                                        System.out.println("====================================");
                                        System.out.println("send [username] [content]");
                                        System.out.println("gsend [group name] [content]");
                                        System.out.println("gcreate [group name] [m1,m2,m3...]");
                                        System.out.println("gmembers [groupname]");
                                        System.out.println("gjoin [group name]");
                                        System.out.println("gquit [group name]");
                                        System.out.println("quit");
                                        System.out.println("rpc [msg]");
                                        System.out.println("====================================");
                                        System.out.println("您的操作时:");
                                        String option = scanner.nextLine();
                                        String[] s = option.split(" ");
                                        switch (s[0]) {
                                            case "send":
                                                ChatRequestMessage chatRequestMessage = new ChatRequestMessage(username, s[1], s[2]);
                                                ctx.writeAndFlush(chatRequestMessage);
                                                break;
                                            case "gsend":
                                                GroupChatRequestMessage groupChatRequestMessage = new GroupChatRequestMessage(username, s[1], s[2]);
                                                ctx.writeAndFlush(groupChatRequestMessage);
                                                break;
                                            case "gcreate":
                                                String[] members = s[2].split(",");
                                                GroupCreatRequestMessage groupCreatRequestMessage = new GroupCreatRequestMessage(s[1], members);
                                                ctx.writeAndFlush(groupCreatRequestMessage);
                                                break;
                                            case "gmembers":
                                                ctx.writeAndFlush(new GroupDetailsRequestMessage(s[1]));
                                                break;
                                            case "gjoin":
                                                ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
                                                break;
                                            case "gquit":
                                                ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
                                                break;
                                            case "quit":
                                                System.out.println("退出系统.....");
                                                ctx.channel().close();
                                                return;
                                        }
                                    }
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }, "input").start();
}

为了让登录失败后结束程序,我们需要在两个线程中通信,这里选择用一个原子Boolean,和一个CountDownLatch 实现,原理是用户IO线程收到控制台输入的用户名和密码,向服务器发送,此时调用CountDownLatch的await()方法在此阻塞,客户端的NIO线程收到响应,如果登陆成功,就将标志设为true,然后调用CountDownLatch的countDown()方法,唤醒用户线程,用户线程根据标志的值是否为true,决定是否继续执行

十.基于netty的简单RPC

由于篇幅有限,下一章更新RPC


版权声明:本文为a11157原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。