本篇主要是分享基于.Net的tcp服务端通信方案。其中基于unity的tcp客户端通信会在后面贴出来。另外关于tcp粘包断包的处理将放在这里分享《C# socket粘包断包处理》。
目录
定时器处理丢失连接的客户端并回调ClientSocket的定时器
整体设计
如图所示一共采用了三层封装来处理整个服务端的逻辑。
首先最上层的TcpServerMgr可以被业务层直接创建,并且通过事件系统将服务器收到的来自客户端的网络消息分发到各个业务系统中去。
其次TcpServer负责了整个tcp网络通信的创建,监听建立连接,关闭,心跳超时等等处理。对于正常接收到的消息通过一个回调函数传递给TcpServerMgr。
最后ClientSocket的设计主要是由于一个服务器要和多个客户端通信,因此单个客户端的通信相关逻辑(发送,接收,关闭,心跳超时)需要被封装起来供由TcpServer整理来管理。
TcpData-单条tcp通信数据
对于所有的tcp消息需要将其封装到一个单独的脚本里面。便于我们定义一条消息的结构,处理加密解密,处理构造和解析。
这里将TcpData的结构简单的分为了四块:
- byte header:数据头,占一个字节,主要用于标识一条tcp消息的开始。
- int len:消息长度,占四个字节,主要用于确定整个消息的长度(header+len+protocol+bytesData)。
- int protocol:协议号,占四个字节,这个是客户端和服务器约定的业务处理编号,比如1表示建立连接,2表示获取英雄数据等等
- byte[] bytesData:数据内容,占N个字节,取决于具体的业务需求。
为什么需要定义数据头,消息长度这些与业务无关的东西。主要还是为了解决tcp通信中的粘包和断包的问题。这个在这里会分享。
定义好数据之后再封装几个供外部调用的接口
- Read():将二进制的字节流转换为我们制定好的消息即header len protocol bytesData,供接收使用
- Get():将我们定义好的消息转换为二进制,供发送使用
其他的一些接口都是辅助这些的并部太重要。
这里是完整的代码,中间有一块关于加密的处理,具体加密的实现并不重要,有兴趣可以看下这里###。这里只加密了数据内容的部分(跟加密的方式有一点关系)。
namespace GYSQ.Net.Tcp
{
// tcp数据处理
public class TcpData : Poolable<TcpData>
{
public static readonly byte TCP_HEAD_MARK = 0b10011011;
public static readonly int TCP_HEADER_LEN = 9;
// 加密
private static Encry mEncry = new Encry("gysq_tcp_key-123456");
// 数据头标记
public byte header = TCP_HEAD_MARK;
// 数据长度
public int len;
// 协议号
public int protocol;
// 数据内容
public byte[] bytesData;
// 读取字节
public void Read(byte[] arryData)
{
using (MemoryStream ms = new MemoryStream(arryData))
{
using (BinaryReader br = new BinaryReader(ms))
{
header = br.ReadByte();
len = br.ReadInt32();
protocol = br.ReadInt32();
bytesData = br.ReadBytes(len - TCP_HEADER_LEN);
// 解密
mEncry.DoEncry(bytesData);
}
}
}
// 获取加密后的二进制
public byte[] Get()
{
using (MemoryStream ms = new MemoryStream())
{
using (BinaryWriter bw = new BinaryWriter(ms))
{
bw.Write(header);
bw.Write(len);
bw.Write(protocol);
if (bytesData != null)
{
bw.Write(bytesData);
}
// 加密
byte[] data = ms.ToArray();
mEncry.DoEncry(data, TCP_HEADER_LEN);
return data;
}
}
}
// 构建消息
public void Build(int protocol, byte[] bytesData = null)
{
header = TCP_HEAD_MARK;
len = TCP_HEADER_LEN + (bytesData == null ? 0 : bytesData.Length);
this.protocol = protocol;
this.bytesData = bytesData;
}
// 获取json
public string GetJsonContent()
{
return Encoding.UTF8.GetString(bytesData);
}
public void Build(int protocol, string strJson)
{
if (string.IsNullOrEmpty(strJson))
{
Build(protocol);
}
else
{
Build(protocol, Encoding.UTF8.GetBytes(strJson));
}
}
protected override void OnDispose()
{
bytesData = null;
base.OnDispose();
}
}
}
ClientSocket-单个客户端的tcp逻辑处理
ClientSocket被构造的前提是服务器已经和某个客户端建立好连接。这块的脚本主要是处理发送、接收、关闭连接、将收到的完整消息传递出去。
发送处理
首先构建一个发送队列,里面存放的是需要被发送的对象TcpData。
启动一个线程,从队列中取出需要被发送的对象并通过Socket.Send发送出去
外部发送的时候只需要向队列中添加构建好的TcpData即可
这里开线程主要是将一些二进制转换的处理放到线程里面避免对主线程造成阻塞。
// 发送消息线程
Thread threadSend = null;
private BlockQueue<TcpData> mSendQueue = new BlockQueue<TcpData>(50);
// 开启发送线程
private void StartHanle()
{
// 开启发送线程
threadSend = new Thread(HandleSend);
threadSend.IsBackground = true;
threadSend.Start();
}
// 发送处理
private void HandleSend()
{
while (bAlive)
{
try
{
if (mSendQueue.TryDequeue(out var tcpData))
{
byte[] bytesData = tcpData.Get();
socket.Send(bytesData);
}
}
catch (Exception ex)
{
FDebug.Log("服务器发送消息异常:" + ex.Message);
bConnect = false;
}
}
}
// 外部发送接口
public void Send(TcpData tcpData)
{
mSendQueue.Enqueue(tcpData);
}
接收处理
和发送处理类似的
构建接收队列
将接收到的二进制转换为TcpData放到队列中
主线程定时接口中将完整的TcpData数据传递到外部
这里有一个对象tcpMessage主要是用来处理粘包和断包的
// 接收消息线程
Thread threadRecive = null;
private BlockQueue<TcpData> mRecvQueue = new BlockQueue<TcpData>(50);
// 开启接收线程
private void StartHanle()
{
// 开启接收线程
threadRecive = new Thread(HandleRecieve);
threadRecive.IsBackground = true;
threadRecive.Start();
}
// 接收处理
private void HandleRecieve()
{
while (bAlive)
{
try
{
int intLength = socket.Receive(mArryBytesRecMsg, 0, mArryBytesRecMsg.Length, SocketFlags.None);
if (intLength < 1)
continue;
byte[] arryBytes = CommonTool.SubArry(mArryBytesRecMsg, 0, intLength);
// 处理粘包断包后
if (tcpMessage.HandleRecMessage(arryBytes))
{
TcpData tcpData = null;
while (tcpMessage.TryGetReciveTcpData(out tcpData))
{
// 处理业务消息
mRecvQueue.Enqueue(tcpData);
}
}
}
catch (Exception ex)
{
FDebug.Log("服务器接收消息异常:" + ex.Message);
bConnect = false;
}
}
}
定时处理,心跳
定时逻辑由TcpServer提供,TcpServer遍历所有的客户端并执行Update方法。
定时处理里主要承担的是将队列中的消息取出并传递到外部。
另外处理心跳逻辑,即客户端一定时间内没有向服务器推送TcpProtocol.TCP_C2S_HEART这条协议即判定丢失连接。
这里为什么不直接再接收到消息的时候就回调呢,主要是为了让服务器对消息的处理是有序的。避免各个客户端的消息先到先处理。至于具体怎么处理顺序可以由TcpServerMgr传入的回调方法去处理它。当然先到先处理也ok的。
// 定时器
public void Update(int intDeltaTime)
{
if (!bConnect)
{
return;
}
// 处理消息
if (mRecvQueue.count > 0)
{
var tcpData = mRecvQueue.Dequeue();
if (tcpData.protocol == TcpProtocol.TCP_C2S_HEART)
{
mIntPingTimer = 0;
}
mFnOnRecv?.Invoke(this, tcpData);
}
// 处理心跳
mIntPingTimer += intDeltaTime;
if (mIntPingTimer > PING_TIME_OUT)
{
bConnect = false;
}
}
关闭连接
这块要将bAlive改为false,这样我们的发送和接收线程可以执行完毕并结束。
另外就是关闭socket,清空队列和线程。
// 关闭
public void Close()
{
bAlive = false;
if (socket != null && socket.Connected)
{
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
Thread.Sleep(50);
mRecvQueue.Close();
mSendQueue.Close();
threadSend = null;
threadRecive = null;
FDebug.Log("服务器断开了客户端的连接:" + strKey);
}
完整的代码
其中socket是和服务器建立好连接的客户端套接字
strKey是客户端的唯一标识由ip和端口组合而成
// 服务器客户端套接字处理
public class ClientSocket
{
// 激活标记
public bool bAlive = true;
// 套接字
public Socket socket;
// 键
public string strKey;
// 连接标记
public bool bConnect = true;
// 心跳计时器
private int mIntPingTimer = 0;
// 心跳超时时间
private static readonly int PING_TIME_OUT = 15000;
// 消息缓冲大小
private static readonly int RECV_BUFFER = 2 * 1024 * 1024;
// 缓冲字节数组
private byte[] mArryBytesRecMsg;
// 消息处理
public TcpMsgRecv tcpMessage = new TcpMsgRecv();
// 发送消息线程
Thread threadSend = null;
private BlockQueue<TcpData> mSendQueue = new BlockQueue<TcpData>(50);
// 接收消息线程
Thread threadRecive = null;
private BlockQueue<TcpData> mRecvQueue = new BlockQueue<TcpData>(50);
// 接收消息回调
private Action<ClientSocket, TcpData> mFnOnRecv;
// 建立连接的socket
public ClientSocket(Socket socket, string strKey, Action<ClientSocket, TcpData> fnOnRecv)
{
mArryBytesRecMsg = new byte[RECV_BUFFER];
this.socket = socket;
this.strKey = strKey;
mFnOnRecv = fnOnRecv;
mRecvQueue.Reset();
mSendQueue.Reset();
StartHanle();
}
// 开启发送线程和接收线程
private void StartHanle()
{
// 开启发送线程
threadSend = new Thread(HandleSend);
threadSend.IsBackground = true;
threadSend.Start();
// 开启接收线程
threadRecive = new Thread(HandleRecieve);
threadRecive.IsBackground = true;
threadRecive.Start();
}
// 发送处理
private void HandleSend()
{
while (bAlive)
{
try
{
if (mSendQueue.TryDequeue(out var tcpData))
{
byte[] bytesData = tcpData.Get();
socket.Send(bytesData);
}
}
catch (Exception ex)
{
FDebug.Log("服务器发送消息异常:" + ex.Message);
bConnect = false;
}
}
}
// 定时器
public void Update(int intDeltaTime)
{
if (!bConnect)
{
return;
}
// 处理消息
if (mRecvQueue.count > 0)
{
var tcpData = mRecvQueue.Dequeue();
if (tcpData.protocol == TcpProtocol.TCP_C2S_HEART)
{
mIntPingTimer = 0;
}
mFnOnRecv?.Invoke(this, tcpData);
}
// 处理心跳
mIntPingTimer += intDeltaTime;
if (mIntPingTimer > PING_TIME_OUT)
{
bConnect = false;
}
}
// 接收处理
private void HandleRecieve()
{
while (bAlive)
{
try
{
int intLength = socket.Receive(mArryBytesRecMsg, 0, mArryBytesRecMsg.Length, SocketFlags.None);
if (intLength < 1)
continue;
byte[] arryBytes = CommonTool.SubArry(mArryBytesRecMsg, 0, intLength);
// 处理粘包断包后
if (tcpMessage.HandleRecMessage(arryBytes))
{
TcpData tcpData = null;
while (tcpMessage.TryGetReciveTcpData(out tcpData))
{
// 处理业务消息
mRecvQueue.Enqueue(tcpData);
}
}
}
catch (Exception ex)
{
FDebug.Log("服务器接收消息异常:" + ex.Message);
bConnect = false;
}
}
}
// 发送接口
public void Send(TcpData tcpData)
{
mSendQueue.Enqueue(tcpData);
}
// 关闭
public void Close()
{
bAlive = false;
if (socket != null && socket.Connected)
{
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
Thread.Sleep(50);
mRecvQueue.Close();
mSendQueue.Close();
threadSend = null;
threadRecive = null;
FDebug.Log("服务器断开了客户端的连接:" + strKey);
}
}
TcpServer-服务端tcp管理器
该脚本主要用了一个字典将所有连接好的客户端储存起来。
提供客户端连接的监听和ClientSocket的创建。
提供定时器逻辑处理-移除丢失连接的客户端,定时从每个ClientScoket处理好的消息抛出给TcpServerMgr。
给TcpServerMgr提供一个发送消息的接口(发送给全部客户端)
创建连接监听任务
首先创建服务器的套接字,端口由外部传入
创建监听线程:当有客户端连接的时候创建一个ClientSocket对象并保存到字典中
// 建立监听套接字
mSocketServerWatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint point = new IPEndPoint(IPAddress.Any, intPort);
mSocketServerWatch.Bind(point);
// 设置监听队列长度
mSocketServerWatch.Listen(intMaxListenNum);
// 创建监听线程
Thread threadWatch = new Thread(WatchConnect);
threadWatch.IsBackground = true;
threadWatch.Start();
// 监听客户端
private void WatchConnect()
{
Socket socketClient = null;
while (bAlive)
{
try
{
socketClient = mSocketServerWatch.Accept();
// 注册客户端
string remoteEndPoint = socketClient.RemoteEndPoint.ToString();
ClientSocket clsClientSocket = new ClientSocket(socketClient, remoteEndPoint, mFnOnRecv);
mDicSocketClients.Add(remoteEndPoint, clsClientSocket);
}
catch(Exception e)
{
FDebug.Log("套接字监听异常:" + e.Message);
}
}
}
定时器处理丢失连接的客户端并回调ClientSocket的定时器
这里的Timer是System.Threading里的Timer
System.Timers本来也是很好用的,但是比较头疼的是找不到这个库,调试的时候可以运行,一旦发布就没了。
// 定时器
private Timer mTimerRecv;
// 启动定时器
mTimerRecv = new Timer(OnTimer, null, 500, 100);
// 更新客户端
private void OnTimer(object sender)
{
// 移除失效的连接
RemoveDisConnectClients();
// 执行连接
SearchClients((clsClientScoket) =>
{
clsClientScoket.Update(DELTA_TIME);
});
}
// 遍历所有客户端
private void SearchClients(Action<ClientSocket> fnSearch)
{
foreach (ClientSocket clsClientScoket in mDicSocketClients.Values)
{
fnSearch(clsClientScoket);
}
}
// 移除客户端
private void RemoveDisConnectClients()
{
// 将丢失连接的客户端添加到移除队列
foreach (var kvClient in mDicSocketClients)
{
if (!kvClient.Value.bConnect)
{
mWaitClearClients.Add(kvClient.Key);
}
}
// 清空移除队列
int clearLen = mWaitClearClients.Count;
if (clearLen > 0)
{
for (int i = clearLen - 1; i > -1; i--)
{
var client = mDicSocketClients[mWaitClearClients[i]];
client.Close();
mDicSocketClients.Remove(client.strKey);
}
mWaitClearClients.Clear();
}
}
完整的代码
// tcp服务端
public class TcpServer
{
// 端口号
private int mPort;
public int port
{
get
{
return mPort;
}
}
// 激活标记
private bool bAlive;
// 服务器套接字
private Socket mSocketServerWatch = null;
// 所有连接的客户端集合
private Dictionary<string, ClientSocket> mDicSocketClients = new Dictionary<string, ClientSocket>();
// 待移除的客户端
private List<string> mWaitClearClients = new List<string>();
// 接收到消息回调
private Action<ClientSocket, TcpData> mFnOnRecv;
// 定时器
private Timer mTimerRecv;
private const int DELTA_TIME = 100;
// 初始化(监听)
public void Init(int intPort, Action<ClientSocket, TcpData> fnOnRecv, int intMaxListenNum = 20)
{
// 激活
bAlive = true;
mPort = intPort;
mFnOnRecv = fnOnRecv;
// 建立监听套接字
mSocketServerWatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint point = new IPEndPoint(IPAddress.Any, intPort);
mSocketServerWatch.Bind(point);
// 设置监听队列长度
mSocketServerWatch.Listen(intMaxListenNum);
// 创建监听线程
Thread threadWatch = new Thread(WatchConnect);
threadWatch.IsBackground = true;
threadWatch.Start();
// 启动定时器
mTimerRecv = new Timer(OnTimer, null, 500, 100);
}
// 更新客户端
private void OnTimer(object sender)
{
// 移除失效的连接
RemoveDisConnectClients();
// 执行连接
SearchClients((clsClientScoket) =>
{
clsClientScoket.Update(DELTA_TIME);
});
}
// 监听客户端
private void WatchConnect()
{
Socket socketClient = null;
while (bAlive)
{
try
{
socketClient = mSocketServerWatch.Accept();
// 注册客户端
string remoteEndPoint = socketClient.RemoteEndPoint.ToString();
ClientSocket clsClientSocket = new ClientSocket(socketClient, remoteEndPoint, mFnOnRecv);
mDicSocketClients.Add(remoteEndPoint, clsClientSocket);
}
catch(Exception e)
{
FDebug.Log("套接字监听异常:" + e.Message);
}
}
}
// 遍历所有客户端
private void SearchClients(Action<ClientSocket> fnSearch)
{
foreach (ClientSocket clsClientScoket in mDicSocketClients.Values)
{
fnSearch(clsClientScoket);
}
}
// 移除客户端
private void RemoveDisConnectClients()
{
// 将丢失连接的客户端添加到移除队列
foreach (var kvClient in mDicSocketClients)
{
if (!kvClient.Value.bConnect)
{
mWaitClearClients.Add(kvClient.Key);
}
}
// 清空移除队列
int clearLen = mWaitClearClients.Count;
if (clearLen > 0)
{
for (int i = clearLen - 1; i > -1; i--)
{
var client = mDicSocketClients[mWaitClearClients[i]];
client.Close();
mDicSocketClients.Remove(client.strKey);
}
mWaitClearClients.Clear();
}
}
#region 外部接口
// 发送消息到所有客户端
public void Send(TcpData tcpData)
{
foreach (var clsClientScoket in mDicSocketClients.Values)
{
clsClientScoket.Send(tcpData);
}
}
#endregion
}
TcpServerMgr-tcp服务器业务层管理器
这个脚本主要创建TcpServer并初始化
承担了TcpServer和业务层沟通的桥梁
具体的沟通桥梁实现方式可以自己定义
这里的TcpServerMgr是个抽象类,将初始化和接收消息的处理抛给了子类去处理。
之前说的服务器决定按怎样的先后顺序处理客户端消息就可以再接收接口中去自己实现。
后面给了一个子类实现的例子,接收消息时处理了连接协议、心跳协议的返回、通过事件系统将消息传递到其他业务系统。
using GYSQ.Net.Tcp.Server;
using GYSQ.Net.Tcp;
// tcp业务管理
public abstract class TcpServerMgr
{
// tcp
protected TcpServer tcpServer = new TcpServer();
// 初始化
public void Init(int port)
{
// 启动tcp服务端
tcpServer.Init(port, OnRecv);
// 子类初始化
OnInit();
}
// 发送消息
public void Send(int protocol, string jsonData = null, ClientSocket clientSocket = null)
{
TcpData tcpData = TcpData.GetPooled();
tcpData.Build(protocol, jsonData);
if (clientSocket != null)
{
clientSocket.Send(tcpData);
}
else
{
tcpServer.Send(tcpData);
}
}
#region 子类
// 初始化
protected abstract void OnInit();
// 接收到消息回调
protected abstract void OnRecv(ClientSocket clientSocket, TcpData tcpData);
#endregion
}
下面这个继承的TcpGameMgr就是一个实现的参考。
using GYSQ.Net.Tcp;
using GYSQ.Net.Tcp.Server;
using System;
using System.Collections.Generic;
using System.Text;
// 游戏tcp
public class TcpGameMgr : TcpServerMgr
{
protected override void OnInit()
{
// 初始化网络事件系统
TcpEvent.instance.Init();
}
protected override void OnRecv(ClientSocket clientSocket, TcpData tcpData)
{
// 处理连接消息和心跳
if (tcpData.protocol == TcpProtocol.TCP_C2S_CONNECT)
{
Send(TcpProtocol.TCP_S2C_CONNECT);
}
else if (tcpData.protocol == TcpProtocol.TCP_C2S_HEART)
{
Send(TcpProtocol.TCP_S2C_HEART);
}
FDebug.LogServerTcp(tcpServer.port, clientSocket.strKey, tcpData.protocol, tcpData.bytesData);
// 将消息推送至业务层
TcpEvent.instance.NotifyEvent(tcpData.protocol, tcpData.GetJsonContent());
// 回收
tcpData.Dispose();
}
}
启动tcp服务
主线程中启动下面的代码即可
TcpGameMgr tcpGameMgr = new TcpGameMgr();
tcpGameMgr.Init(8089);