一.环境配置
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
netty是本项目的基础
lombok用于简化开发
gson用于序列化Json
logback用于提供日志
二.包结构
其中message用于定义服务端和客户端的消息种类
protocol用于定义双方之间的通信协议
config用于从配置文件中读取配置
server包下,handler包用于定义处理各类request消息的处理器
service提供UserService服务,用于登录校验
session提供会话服务,包括私聊和群聊
三.开发message块
首先抽象出一个Message父类
public abstract class Message implements Serializable {
protected int messageType;
protected int messageSequenceId;
public abstract int getMessageT();
public abstract int getMessageSequenceId();
public static final int LoginRequest=1;//登录请求
public static final int LoginResponse=2;//登录响应
public static final int ChatRequest=3;//私聊请求
public static final int ChatResponse=4;//私聊响应
public static final int GroupChatRequestMessage=5;//群聊请求
public static final int GroupChatResponseMessage=6;//群聊响应
public static final int GroupCreatRequestMessage=7;//创建group请求
public static final int GroupCreatResponseMessage=8;//创建group响应
public static final int GroupDetailsRequestMessage=9;//group细节请求
public static final int GroupDetailsResponseMessage=10;//group细节响应
public static final int GroupJoinRequestMessage=11;//加入group请求
public static final int GroupJoinResponseMessage=12;//加入group响应
public static final int GroupQuitRequestMessage=13;//退出group请求
public static final int GroupQuitResponseMessage=14;//退出group响应
public static final int PingMessage=15;//客户端心跳消息,之后3s一次
public static final int RpcRequestMessage=101;//rpc请求消息
public static final int RpcResponseMessage=102;//rpc请求响应
public static HashMap<Integer,Class<?>> MessageMap=new HashMap<>();
static {
MessageMap.put(1,LoginRequestMessage.class);
MessageMap.put(2,LoginResponseMessage.class);
MessageMap.put(3,ChatRequestMessage.class);
MessageMap.put(4,ChatResponseMessage.class);
MessageMap.put(5,GroupChatRequestMessage.class);
MessageMap.put(6,GroupChatResponseMessage.class);
MessageMap.put(7,GroupCreatRequestMessage.class);
MessageMap.put(8,GroupCreatResponseMessage.class);
MessageMap.put(9,GroupDetailsRequestMessage.class);
MessageMap.put(10,GroupDetailsResponseMessage.class);
MessageMap.put(11,GroupJoinRequestMessage.class);
MessageMap.put(12,GroupJoinResponseMessage.class);
MessageMap.put(13,GroupQuitRequestMessage.class);
MessageMap.put(14,GroupQuitResponseMessage.class);
MessageMap.put(15,PingMessage.class);
MessageMap.put(101, com.ghc.message.RpcRequestMessage.class);
MessageMap.put(102, RpcResponseMessage.class);
}
}
主要有两个字段: messageType消息类型,用于区分消息,messageSequenceId,提供一个通信序列号。将消息类型和其对应的Class类存入map,便于后续Gson反序列化使用
实现响应的子类,rpc相关消息最后实现
为每个消息添加独有的字段,为了简单,暂时不考虑序列号
/**
* 聊天
*/
@Data
public class ChatRequestMessage extends Message{
private String source;
private String target;
private String content;
public ChatRequestMessage(String source, String target, String content) {
this.source = source;
this.target = target;
this.content = content;
}
@Override
public int getMessageT() {
return Message.ChatRequest;
}
@Override
public int getMessageSequenceId() {
return 0;
}
}
其他的消息也一样,加入自己特有的字段
四.开发序列化器
为了增强可拓展性,让序列化器可选择,这里定义一个Serializer接口,提供反序列方法和序列方法。
public interface Serializer {
//反序列化方法
<T> T deserialize(Class<T> clazz,byte[] bytes);
//序列化方法
<T> byte[] serialize(T object);
}
接着在其内部用枚举的方式定义JDK序列化器和Gson序列化器(注意要实现Serializer接口)
enum Algorithm implements Serializer{
Java{
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
return (T) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败",e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
new ObjectOutputStream(bytes).writeObject(object);
return bytes.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败",e);
}
}
},
Json{
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
String json = new String(bytes, StandardCharsets.UTF_8);
return new Gson.fromJson(json,clazz);
}
@Override
public <T> byte[] serialize(T object) {
String json = new Gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
提供一个Config类,读取application.properties中的序列化方式
public class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static int getServerPort(){
return Integer.parseInt(properties.getProperty("server.port","8888"));
}
public static Serializer.Algorithm getSerializerAlgorithm(){
return Serializer.Algorithm.valueOf(properties.getProperty("serializer.algorithm","Java"));
}
}
之后可以在别的地方调用getSerializerAlgorithm()方法得到具体的序列化器
五.开发通信协议
首先继承netty中的MessageToMessageCodec类,实现其方法,之后加入NIOchannel的pipeline即可.
encode是编码,将消息对象转为byteBuf
@ChannelHandler.Sharable
/**
* 必须和LengthFieldBasedFrameDecoder一起使用,确保传过来的bytebuf是完整的,不需要存储状态
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List<Object> list) throws Exception {
ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
//1. 4字节的魔数
byteBuf.writeBytes(new byte[]{7, 7, 7, 7});
//2. 1字节的版本
byteBuf.writeByte(1);
//3. 用1字节代表序列化的方式 0代表jdk,1代表json
//枚举对象的ordinal()返回在类中的顺序下标,和数字对应起来
byteBuf.writeByte(Config.getSerializerAlgorithm().ordinal());
//4. 用1字节 代表指令类型 ,比如登录,注册,发消息,聊天
byteBuf.writeByte(message.getMessageT());
//写入一个对齐字节,无意义
byteBuf.writeByte(0xff);
//5. 4字节的指令序号
byteBuf.writeInt(message.getMessageSequenceId());
//6. 获取内容字节数组
byte[] bytes = Config.getSerializerAlgorithm().serialize(message);
//7.长度
byteBuf.writeInt(bytes.length);
//8.写入内容
byteBuf.writeBytes(bytes);
list.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int magicNum = byteBuf.readInt();
byte version = byteBuf.readByte();
byte serializerAlgorithm = byteBuf.readByte();
int messageType = byteBuf.readByte();
byteBuf.readByte();
int sequenceId = byteBuf.readInt();
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
//1.根据传过来的字节找到应该用的序列化算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
//2.如果底层使用Gson,则须指定消息的实际类型,而不能是大Message抽象类
Message message = (Message) algorithm.deserialize(Message.MessageMap.get(messageType), bytes);
//加入list,以便下一个handler拿到结果
list.add(message);
}
}
六.实现Service层,包括UserService和Session,GroupSession
这里全部使用工厂模式,即先定义一个接口,再提供一个工厂,再提供一个具体的实现,用的时候通过工厂拿到具体的实现
UserService
public interface UserService {
/**
*
* @param username 用户名
* @param password 密码
* @return 登陆成功返回true,失败返回false
*/
boolean login(String username,String password);
}
实现校验登录功能,可以在具体的实现中写死用户名和密码,存在map中,只进行校验,也可以实现注册功能,提供一个注册register方法.
Session,用于维护用户和他的Channel,提供登陆绑定和退出解绑,和通过用户名拿到channel的方法(注意这里的channel是netty自己的channel)
import io.netty.channel.Channel;
/**
* 会话管理接口,把用户名和channel的信息进行管理
*/
public interface Session {
/**
* 绑定会话
* @param channel 哪个channel要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel,String username);
/**
* 解绑会话
* @param channel 哪个channel要解绑会话
*/
void unBind(Channel channel);
/**
* 根据用户名返回他连接的channel
* @param username 哪个用户
* @return 连接的channel
*/
Channel getChannel(String username);
public String getUsername(Channel channel);
}
创建一个Group类,封装群名和成员信息
@Data
public class Group {
String name;
Set<String> members;
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
}
GroupSession
/**
* 聊天室管理接口
*/
public interface GroupSession {
/**
* 创建组,如果存在,返回null,
* @param name group 的名字
* @param members group 的成员
* @return 返回组
*/
Group createGroup(String name, Set<String> members);
/**
*
* @param name 组名
* @param member 成员名
* @return 如果组不存在,返回null
*/
Group joinMember(String name,String member);
/**
*
* @param name 组名
* @param member 成员名
* @return 组不存在返回null
*/
Group removeMember(String name,String member);
/**
*删除组
* @param name 组名
* @return 组名不存在返回null
*/
Group removeGroup(String name);
/**
* 获得成员
* @param name 名字
* @return 返回集合
*/
Set<String> getMembers(String name);
/**
* 获取组对应的channel
* @param name 组名
* @return channel集合
*/
List<Channel> getChannels(String name);
}
七.编写服务器处理各种request请求的handler
这里可以继承 SimpleChannelInboundHandler类,它可以指定接收的消息泛型,这样在pipeline中,其他类型的消息不会传过来
例:
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {
String source = msg.getSource();
String target = msg.getTarget();
Channel targetChannel = SessionFactory.getSession().getChannel(target);
Channel sourceChannel = SessionFactory.getSession().getChannel(source);
if(targetChannel!=null){
targetChannel.writeAndFlush(ChatResponseMessage.success(source,target, msg.getContent()));
}else {
sourceChannel.writeAndFlush(ChatResponseMessage.fail("对方用户未上线或不存在,请重新发送信息"));
}
}
}
注意:所有的handler都要加@Sharable注解,不然netty在pipeline中认为这是一个线程不共享的handler,只会用一次,下一次消息进来就不会经过这个handler了。
具体的业务逻辑不是很复杂,大家自行实现
八.编写服务器
由于handler是sharable共享的,所以只需要创建一份
LengthFieldBasedFrameDecoder 是基于长度的帧长解码器,用于解决半包粘包的问题,原理是 指定数据第几位是长度,然后解码器会等待接收的数据达到长度,一次性发送,接收的时候同理。
用来判断是不是读空闲时间过长 或者写时间过长
// 5s没有收到某个channel的数据,会触发相应地IdleState#READER_IDEL时间,需要自己处理
nioSocketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0));
//双向的handler,既入站又出站
nioSocketChannel.pipeline().addLast(new ChannelDuplexHandler() {
//用来触发用户产生的特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.debug("5s没有读到数据");
}
}
});
这两个handler,第一个是netty提供的定时处理器,第一个参数为5指的是如果5s对同一个channel没有读时间,就会向后发出一个 IdleStateEvent的时间,
而第二个handler的用处就是专门接收IdleStateEvent时间,然后做出响应,这里5s没有接收到数据,做出的响应是打印日志,由于客户端3s发送一个PingMessage,5s没收到也可以直接认定该channel关闭了,解除用户的绑定。
@Slf4j
public class ChatServer {
public static void main(String[] args) {
//抽取出来,每个channel共用同一个
//messageCodec也是无状态的,只对这次传过来的数据处理,因此可以抽取出来
LoggingHandler loggingHandler = new LoggingHandler();
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
MessageCodecSharable messageCodec = new MessageCodecSharable();
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
GroupCreatRequestMessageHandler groupCreatRequestMessageHandler = new GroupCreatRequestMessageHandler();
GroupDetailsRequestMessageHandler groupDetailsRequestMessageHandler = new GroupDetailsRequestMessageHandler();
GroupChatRequestMessageHandler groupChatRequestMessageHandler = new GroupChatRequestMessageHandler();
GroupQuitRequestMessageHandler groupQuitRequestMessageHandler = new GroupQuitRequestMessageHandler();
QuitHandler quitHandler = new QuitHandler();
RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
try {
ChannelFuture channelFuther = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
nioSocketChannel.pipeline().addLast(loggingHandler);
nioSocketChannel.pipeline().addLast(messageCodec);
// 用来判断是不是读空闲时间过长 或者写时间过长
// 5s没有收到某个channel的数据,会触发相应地IdleState#READER_IDEL时间,需要自己处理
nioSocketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0));
//双向的handler,既入站又出站
nioSocketChannel.pipeline().addLast(new ChannelDuplexHandler() {
//用来触发用户产生的特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("5s没有读到数据");
}
}
});
// 利用SimpleChannelInboundHandler可以接收特定的消息类型,方便处理用户各种数据
nioSocketChannel.pipeline().addLast(loginRequestMessageHandler);
nioSocketChannel.pipeline().addLast(chatRequestMessageHandler);
nioSocketChannel.pipeline().addLast(groupCreatRequestMessageHandler);
nioSocketChannel.pipeline().addLast(groupDetailsRequestMessageHandler);
nioSocketChannel.pipeline().addLast(groupChatRequestMessageHandler);
nioSocketChannel.pipeline().addLast(groupQuitRequestMessageHandler);
nioSocketChannel.pipeline().addLast(quitHandler);
nioSocketChannel.pipeline().addLast(rpcRequestMessageHandler);
}
}).bind(8888).sync();
channelFuther.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
九.编写客户端
public class ChatClient {
/**
* ?服务端的handler必须加sharable注解,不然不能启动多个客户端
* @param args
*/
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
//用户输入线程和服务器响应nio线程间通信,每次响应触发用户输入,执行操作
CountDownLatch signal = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(true);
try {
new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
//nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new MessageCodecSharable());
//心跳,每3s向服务器发心跳数据包
//3s没有写事件,会触发相应地IdleState#WRITER_IDEL时间,然后向服务器发数据包
nioSocketChannel.pipeline().addLast(new IdleStateHandler(0, 3, 0));
//双向的handler,既入站又出站
nioSocketChannel.pipeline().addLast(new ChannelDuplexHandler() {
//用来触发用户产生的特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
//System.out.println("3s没有写数据,发送一个心跳包");
ctx.writeAndFlush(new PingMessage());
}
}
});
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<LoginResponseMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginResponseMessage loginResponseMessage) throws Exception {
System.out.println(loginResponseMessage.getReason());
if (!loginResponseMessage.isSuccess()) {
channelHandlerContext.channel().close();
LOGIN.set(false);
}
signal.countDown();
}
});
}
}).connect("localhost", 8888).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
System.out.println("已退出...");
}
}
}
为了让NIO线程专注于接收服务端发来的响应数据,这里用异步的方式,在连接刚建立的时候,新建一个线程专门处理用户的输入IO功能,做法是加入一个handler,重写他的channelActive方法,新建一个线程
nioSocketChannel.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
//在连接建立后触发active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//在一个新线程中接收用户控制台的输入,账户名和密码
//尽量减少原生的线程的使用
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
LoginRequestMessage message = new LoginRequestMessage(username, password);
ctx.writeAndFlush(message);
System.out.println("等待后续操作");
try {
signal.await();
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
while (true) {
System.out.println("====================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [groupname]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("rpc [msg]");
System.out.println("====================================");
System.out.println("您的操作时:");
String option = scanner.nextLine();
String[] s = option.split(" ");
switch (s[0]) {
case "send":
ChatRequestMessage chatRequestMessage = new ChatRequestMessage(username, s[1], s[2]);
ctx.writeAndFlush(chatRequestMessage);
break;
case "gsend":
GroupChatRequestMessage groupChatRequestMessage = new GroupChatRequestMessage(username, s[1], s[2]);
ctx.writeAndFlush(groupChatRequestMessage);
break;
case "gcreate":
String[] members = s[2].split(",");
GroupCreatRequestMessage groupCreatRequestMessage = new GroupCreatRequestMessage(s[1], members);
ctx.writeAndFlush(groupCreatRequestMessage);
break;
case "gmembers":
ctx.writeAndFlush(new GroupDetailsRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
System.out.println("退出系统.....");
ctx.channel().close();
return;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "input").start();
}
为了让登录失败后结束程序,我们需要在两个线程中通信,这里选择用一个原子Boolean,和一个CountDownLatch 实现,原理是用户IO线程收到控制台输入的用户名和密码,向服务器发送,此时调用CountDownLatch的await()方法在此阻塞,客户端的NIO线程收到响应,如果登陆成功,就将标志设为true,然后调用CountDownLatch的countDown()方法,唤醒用户线程,用户线程根据标志的值是否为true,决定是否继续执行
十.基于netty的简单RPC
由于篇幅有限,下一章更新RPC
版权声明:本文为a11157原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。