NIO 模型实现 ChatServer
Channel 负责传输, Buffer 负责存储。
1.与bio相同的地方:端口号、关键字、缓冲区大小。
2.不同之处:bio用serversocket,nio用ServerSocketChannel,与之对应需要实现2个byte对象。bio用了线程池复用线程,nio用单线程解决即可,需要加入selector。
start过程:
- 打开ServerSocketChannel,设置为非阻塞,把它绑定到监听端口。创建selector,在服务端channel上注册accept事件给selector监听(和事件相关的信息都包含在selectionkey里面)。
- 在循环中让selector阻塞式调用select函数(它的返回值是个整数,代表多少个监视事件被处罚了)。通过selector返回selectedKeys(set形式),然后依次处理。全部处理完后用clear手动清空已处理事件。不清空的化,下次调用selector会把剩下的加入下一轮while循环。
- 最后close掉selector。selector会解除所有注册,也会关闭对应的channel。
**handle过程:**排除法思想
- 处理ACCEPT事件–和客户建立了链接:获取服务器通道,通过这个通道获取客户端通道并设置非阻塞,客户端通道上注册需要监听的READ事件。
- 处理READ事件—客户发送消息,有了可读的事件:获取客户端通道,用read获得从通道中传来的数据,读进buffer里(这里一直用read读,直到read的返回值=0,然后把buffer从写模式切换为读模式)。如果是空消息,意味着连接异常,使用cancel不再监听已经注册的事件。如果不是空,把消息发给其他在线客户端(forwardMessage)。
- forwardMessage群发数据:首先找到所有在线的客户端。selector.keys()可以找到所有注册了的事件,但是要跳过服务器的ServerSocketChannel,且还要不是发送这个消息的客户端本身。 **如何发送?**先清空buffer,然后把数据put进buffer,flip把写状态翻转成读状态。把buffer写进客户端的channel里去。
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
/**
* 使用nio编程模型实现多人聊天室-服务端
*
* @author caojx created on 2020/6/23 6:50 下午
*/
public class ChatServer {
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
/**
* 缓冲区大小
*/
private static final int BUFFER = 1024;
/**
* 服务器端通道,使用通道进行通信
*/
private ServerSocketChannel serverSocketChannel;
/**
* 选择器
*/
private Selector selector;
/**
* 用来读取消息的buffer
*/
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
/**
* 用来写入信息的buffer
*/
private ByteBuffer wBeBuffer = ByteBuffer.allocate(BUFFER);
//强制性要求发送接收都是utf-8
private Charset charset = Charset.forName("UTF-8");
private int port;
public ChatServer(int port) {
this.port = port;
}
/**
* 检查是否退出
*
* @param msg
* @return
*/
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
/**
* 开始启动
*/
public void start() {
try {
// 创建一个ServerSocketChannel通道
serverSocketChannel = ServerSocketChannel.open();
// 设置通道为未非阻塞(默认是阻塞的)
serverSocketChannel.configureBlocking(false);
// 绑定到监听端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 创建selector
selector = Selector.open();
// 服务端通道上注册需要监听的ACCEPT客户端连接请求事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("启动服务器,监听端口:" + port + ".....");
// 进入监听模式
while (true) {
// select()函数是阻塞式的
selector.select();
// 获取监听事件,每一个被触发的事件与他相关的信息都包装在SelectionKey对象中
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
// 处理被触发事件
handles(selectionKey);
}
// 手动把已处理的事件清空
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(selector);
}
}
/**
* 处理被触发事件
* 主要处理两种事件
* 1.客户端连接请求时间ACCEPT事件
* 2.已连接的客户端发送消息后的READ事件
*
* @param selectionKey
* @throws IOException
*/
private void handles(SelectionKey selectionKey) throws IOException {
// ACCEPT事件--和客户建立了链接
if (selectionKey.isAcceptable()) {
// 获服务器通道,即返回为之创建此键的通道
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
// 获取客户端通道,并接受客户端连接请求。类似bio
SocketChannel client = server.accept();
// 设置非阻塞
client.configureBlocking(false);
// 客户端通道上注册需要监听的READ事件
client.register(selector, SelectionKey.OP_READ);
System.out.println("客户端[" + client.socket().getPort() + "]" + "客户端链接了");
// READ事件---客户发送消息,有了可读的事件
} else if (selectionKey.isReadable()) {
// 获取客户端通道,并读取客户端发送过来的消息
SocketChannel client = (SocketChannel) selectionKey.channel();
String fwMsg = receive(client);
if (fwMsg.isEmpty()) {
// 客户端异常,不再监听该客户端可能发送过来的消息
selectionKey.cancel();
// 事件发生了变化,更新selector监听的事件
selector.wakeup();
} else {
// 消息转发给其他在线的客户端
forwardMessage(client, fwMsg);
// 检查用户是否退出
if (readyToQuit(fwMsg)) {
selectionKey.cancel();
selector.wakeup();
System.out.println("客户端[" + client.socket().getPort() + "]" + "断开链接了");
}
}
}
}
/**
* 转发消息给客户端
*
* @param client 发送消息的客户端本身
* @param fwMsg 消息
*/
private void forwardMessage(SocketChannel client, String fwMsg) throws IOException {
// selector.keys() 会返回所有已经注册在selector上的SelectionKey的集合,
// 我们可以认为注册在selector上的SelectionKey即是当前在线的客户端
//这个keys()是所有注册的事件,区别于之前触发的事件。
for (SelectionKey key : selector.keys()) {
// 跳过服务器端的通道 ServerSocketChannel
Channel connectedClient = key.channel();
if (connectedClient instanceof ServerSocketChannel) {
continue;
}
// 检测channel没有被关闭,且通道不是自己本身
if (key.isValid() && !client.equals(connectedClient)) {
wBeBuffer.clear();
wBeBuffer.put(charset.encode(fwMsg));
wBeBuffer.flip();
while (wBeBuffer.hasRemaining()) {
((SocketChannel) connectedClient).write(wBeBuffer);
}
}
System.out.println("客户端[" + client.socket().getPort() + "]" + fwMsg);
}
}
/**
* 读取channel上面的信息
*
* @param client
* @return
* @throws IOException
*/
private String receive(SocketChannel client) throws IOException {
// 清理残留的内容
rBuffer.clear();
while (client.read(rBuffer) > 0) ;
// 写模式切换回读模式
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
/**
* 关闭资源
*/
public void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ChatServer charServer = new ChatServer(DEFAULT_PORT);
charServer.start();
}
}
版权声明:本文为qq_43378019原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。