游戏服务器的逻辑服务器下有逻辑网关(逻辑网关可有多个),逻辑网关下有一个监听线程、一个接收线程、一个接收处理线程和多个发送线程。
所以,主线程、监听线程、接收线程、接收处理线程、发送线程之间需要对会话的数据进行管理。
设计上:
(1)在主线程的所有的网关对象管理会话列表的初始化和销毁,在启动网关时会初始化。
(2)监听线程监听到新的连接会创建会话(分配发送缓冲区和接收缓冲区),并添加到会话列表。
(3)接收数据处理线程遍历会话列表,标识关闭长时间(10min)不活动的会话。对于被标识关闭(可能在别的线程标识的)的会话需要释放会话的资源(接收缓冲区和发送缓冲区),并且置空该会话在会话列表中的位置(以便以后放置新的会话)。
(4)发送线程关闭发送错误的会话(并标识关闭),关闭发送包过大(4MB)的会话。
(5)接收线程关闭接收出错的会话(以及标记关闭的会话)。
1、网关会话列表初始化和销毁(主线程)
(1)网关启动初始化会话列表
BOOL ExecSockDataMgr::Start()
{
......
//初始化会话队列
InitSessions();
......
}
VOID ExecSockDataMgr::InitSessions()
{
EnterCriticalSection( &m_SessionLock );
m_SessionList.reserve(65536);
LeaveCriticalSection( &m_SessionLock );
}
(2)网关关闭,关闭所有的会话
关闭和销毁会话列表里的所有的会话,回收其内存
VOID ExecSockDataMgr::Stop()
{
......
CancelRemainSendSessionBuffers();
CloseAllSessions( TRUE );//关闭所有的会话
UninitSessions();//释放所有的会话
m_boStarted = FALSE;
}释放该网关中所有的会话
VOID ExecSockDataMgr::UninitSessions()
{
EnterCriticalSection( &m_SessionLock );
PRUNGATEUSERSESSION *pSessionList = m_SessionList;
PRUNGATEUSERSESSION pSession;
for (INT_PTR i=m_SessionList.count()-1; i>-1; --i)
{
pSession = pSessionList[i];
if (pSession)
{
FreeSession(pSession);
}
}
m_SessionList.clear();
m_SessionAllocator.freeAll();
LeaveCriticalSection( &m_SessionLock );
}关闭所有的会话
VOID ExecSockDataMgr::CloseAllSessions(BOOL boForceClose)
{
INT_PTR i;
TICKCOUNT dwCurTick;
PRUNGATEUSERSESSION pSession;
EnterCriticalSection( &m_SessionLock );//会话锁
EnterCriticalSection( &m_RecvQueueLock );//接收队列锁(单个网关内接受数据线程有一个)
for ( i=0; i<m_nSendThreadCount; ++i )
{
EnterCriticalSection( &m_SendThreads[i].SendQueueLock );// 发送队列锁(单个网关内发送线程有多个)
}
moon::OS::osSleep(3);//休眠以等待数据接收以及各发送线程被阻塞
dwCurTick = _getTickCount();
for ( i=m_SessionList.count()-1; i>-1; --i )
{
pSession = m_SessionList[i];
if ( pSession && pSession->nSocket != INVALID_SOCKET )
{
if ( boForceClose )
{
m_SessionList[i] = NULL;
PostEngineClosePlayer( pSession );
closesocket( pSession->nSocket );
pSession->boMarkToClose = true;
pSession->boRemoteClosed = true;
pSession->dwCloseTick = 0;
pSession->nSocket = INVALID_SOCKET;
pSession->nVerifyIdx = 0;
FreeSession(pSession);
}
else CloseSession( pSession );//关闭会话(关闭socket和标识关闭会话)
}
}
for ( i=0; i<m_nSendThreadCount; ++i )
{
LeaveCriticalSection( &m_SendThreads[i].SendQueueLock );
}
LeaveCriticalSection( &m_RecvQueueLock );
LeaveCriticalSection( &m_SessionLock );
}2、创建会话(监听线程)
添加会话到会话列表
PRUNGATEUSERSESSION ExecSockDataMgr::NewSession(SOCKET nSocket, SOCKADDRIN RemoteAddr)
{
PRUNGATEUSERSESSION pNewSession = NULL;
EnterCriticalSection( &m_SessionLock );//会话锁
pNewSession = m_SessionAllocator.allocObject();//分配新的会话
pNewSession->nSocket = INVALID_SOCKET;
pNewSession->SendBuf.nOffset = 0;
pNewSession->dwCloseTick = 0;
pNewSession->boMarkToClose = false;//关闭标识
pNewSession->boRemoteClosed = false;//等待关闭标识
PRUNGATEUSERSESSION *pSessionList = m_SessionList;
for (INT_PTR i=m_SessionList.count()-1; i>-1; --i)//从后往前查找
{
if (!pSessionList[i])//找到会话列表里的空位置
{
pSessionList[i] = pNewSession;
pNewSession->nIndex = (int)i;//设置会话的索引
pSessionList = NULL;
break;
}
}
if (pSessionList)//找不到空闲的位置,就添加到最后面,需要拓展会话列表的长度
{
pNewSession->nIndex = (int)m_SessionList.add(pNewSession);
}
LeaveCriticalSection( &m_SessionLock );
if ( pNewSession )//内存地址可能改变(在列表添加操作中被修改内存位置)
{
pNewSession->SockAddr= RemoteAddr;
pNewSession->nPlayerIndex= -1;
pNewSession->pPlayer= NULL;
pNewSession->wPacketVerifyKey= 0;
pNewSession->nRecvPacketCount=0;
pNewSession->nSendPacketCount=0;
pNewSession->btUserState= eGUSWaitRequestKey;
pNewSession->boMarkToClose = false;
pNewSession->boRemoteClosed = false;
pNewSession->boSendAvaliable= true;
pNewSession->dwSendTimeOut= 0;
pNewSession->dwCloseTick= 0;
pNewSession->dwConnectTick= _getTickCount();
pNewSession->dwClientMsgTick= pNewSession->dwConnectTick;
pNewSession->dwServerMsgTick= pNewSession->dwConnectTick;
pNewSession->dwMsgIntervalTick = pNewSession->dwConnectTick;
pNewSession->nVerifyIdx= InterlockedIncrement( (LONG*)&m_nUserVerify );
pNewSession->nIntervalMsgCount = 0;
pNewSession->nAccountId= 0;
pNewSession->sAccount[0]= 0;
pNewSession->PlayerId.llid= 0;
pNewSession->sCharName[0]= 0;
if ( !pNewSession->RecvBuf.lpBuffer )
{
pNewSession->RecvBuf.nSize = SESSION_RECV_BUFSIZE ;
pNewSession->RecvBuf.lpBuffer = (char*)malloc( pNewSession->RecvBuf.nSize );
}
pNewSession->RecvBuf.nOffset = 0;
if ( !pNewSession->SendBuf.lpBuffer )
{
pNewSession->SendBuf.nSize = SESSION_SEND_BUFSIZE ;
pNewSession->SendBuf.lpBuffer = (char*)malloc( pNewSession->SendBuf.nSize );
}
pNewSession->SendBuf.nOffset = 0;
//最后设置会话的nSocket成员数据,
//以防止其他线程通过nSocket != INVALID_SOCKET判断后对一个尚为初始化完毕的会话进行操作!
pNewSession->nSocket = nSocket;
m_nActiveUser++;//增加活动用户计数
}
return pNewSession;
}3、检查需要关闭的会话(接收数据处理线程)
首先需要标识关闭长时间(10min)不活动的会话。
对于被标识关闭的会话需要释放会话的资源(接收缓冲区和发送缓冲区)(需要回收会话对象回内存分配器)。
BOOL ExecSockDataMgr::CheckCloseSessions(BOOL boForceCheck)
{
static const DWORD dwCloseIdleSessionLong = 10 * 60 * 1000;
BOOL boResult;
TICKCOUNT dwCurTick;
INT_PTR i;
PRUNGATEUSERSESSION pSession;
if ( boForceCheck )
{
EnterCriticalSection( &m_SessionLock );
boResult = TRUE;
}
else boResult = TryEnterCriticalSection( &m_SessionLock );
if ( boResult )
{
dwCurTick = _getTickCount();
for ( i=m_SessionList.count()-1; i>-1; --i )//遍历会话列表中的存活的会话
{
pSession = m_SessionList[i];
if (!pSession)
continue;
if ( pSession->boMarkToClose || pSession->boRemoteClosed )
{
//加上pSession->dwCloseTick非0判断,防止内存错误
//可能的原因是标记pSession->boMarkToClose或pSession->boRemoteClosed为true的线程尚未来得及
//给pSession->dwCloseTick赋值,而此线程恰好已经执行到此处对pSession的资源进行释放,从而
//造成内存错误。延时回收也是为了避免这里回收了会话而其他线程使用该会话造成的内存错误
if ( pSession->dwCloseTick && dwCurTick - pSession->dwCloseTick >= 10 * 1000 )
{
m_SessionList[i] = NULL;//置空会话列表中的该会话
LOCK_SESSION_SEND( pSession );//需要加会话的锁
pSession->nSocket = INVALID_SOCKET;
pSession->nVerifyIdx = 0;
//释放会话只是释放其发送缓冲区和接收缓冲区
FreeSession(pSession);
pSession->boMarkToClose = false;
pSession->boRemoteClosed = false;
pSession->dwCloseTick = 0;
m_SessionAllocator.freeObject(pSession);//回收会话对象到会话内存分配器
UNLOCK_SESSION_SEND( pSession );
}
}
else if ( pSession->nSocket != INVALID_SOCKET )
{
if ( (dwCurTick > pSession->dwClientMsgTick && dwCurTick - pSession->dwClientMsgTick >= dwCloseIdleSessionLong)
||(dwCurTick > pSession->dwServerMsgTick && dwCurTick - pSession->dwServerMsgTick >= dwCloseIdleSessionLong)
)
{
logWarn("关闭了一个较长时间处于非活动状态的会话。");
CloseSession( pSession );
}
}
}
LeaveCriticalSection( &m_SessionLock );
}
return boResult;
}
4、发送会话的发送列表的数据(发送线程)
遍历发送线程的指定的那些会话,处理该会话上的数据的发送
VOID ExecSockDataMgr::CheckSendSessionBuffers(PEXECDATASENDTHREAD pSendThread)
{
int nErr, nRemainSize;
char *pBuffer;
PRUNGATEUSERSESSION *pSessionList = m_SessionList;
PRUNGATEUSERSESSION pSession;
pSendThread->boSendEWouldBlock = false;
pSendThread->boSendFewBuffer = false;
//这里没有加会话列表锁是因为,在开始时就分配65535的会话列表大小,一个逻辑网关上不会有超出这个大小的有效会话数量,
//会话的释放会置空会话列表的成员,会话的nSocket 是在最后才初始化的,所以可以判断pSession->nSocket != INVALID_SOCKET 来判断会话的初始化
//后的有效性,根据pSession->boMarkToClose和pSession->boRemoteClosed 来判断会话是否被关闭
INT_PTR nCount = m_SessionList.count();
for ( INT_PTR nIndex = pSendThread->nThreadIdx; nIndex < nCount; nIndex += m_nSendThreadCount )
{
pSession = pSessionList[nIndex];
if (!pSession)
continue;
if ( pSession->nSocket != INVALID_SOCKET && !pSession->boMarkToClose && !pSession->boRemoteClosed )
{
if ( !pSession->boSendAvaliable )
{
if ( _getTickCount() >= pSession->dwSendTimeOut )//等到发送的时间再发送(每个会话每隔300ms才可发送)
{
pSession->boSendAvaliable = true;
pSession->dwSendTimeOut = 0;
}
else continue;
}
// 这里可以不加锁,因为数据接收处理线程回收会话资源是延时10s的(这里的锁需要验证)
//(如果实在要加锁,对于数据接收处理线程会修改会话,因为不会有其他发送线程使用该会话,所以可以是互斥量)
if ( TRYLOCK_SESSION_SEND( pSession ) )
{
nRemainSize = pSession->SendBuf.nOffset;
if ( nRemainSize > 4096 * 1024 )
{
UNLOCK_SESSION_SEND( pSession );
CloseSession( pSession );
nRemainSize = 0;
logWarn("关闭了一个发送数据队列大于4MB的连接。");
continue;
}
if ( nRemainSize )
{
pBuffer = pSession->SendBuf.lpBuffer;
#ifdef LINUX
nErr = ::send( pSession->nSocket, pBuffer, nRemainSize, MSG_NOSIGNAL);
#else
nErr = ::send( pSession->nSocket, pBuffer, nRemainSize, 0 );
#endif
if ( nErr > 0 )
{
pSession->nSendPacketCount++;
InterlockedExchangeAdd( (LONG*)&m_dwWaitSendUserSize, -nErr );
InterlockedExchangeAdd( (LONG*)&m_dwSendUserSize, nErr );
if ( nErr < nRemainSize )
{
pSendThread->boSendFewBuffer = true;
memcpy( pBuffer, &pBuffer[nErr], nRemainSize - nErr );
nRemainSize -= nErr;
pBuffer[nRemainSize] = 0;
pSession->SendBuf.nOffset = nRemainSize;
}
else
{
pBuffer[0] = 0;
pSession->SendBuf.nOffset = 0;
}
}
else if ( !nErr || WSAGetLastError() != WSAEWOULDBLOCK )
{
pSession->boRemoteClosed = true;
CloseSession( pSession );
InterlockedExchangeAdd( (LONG*)&m_dwWaitSendUserSize, -nRemainSize );
InterlockedExchangeAdd( (LONG*)&m_dwSendUserSize, nRemainSize );
pBuffer[0] = 0;
pSession->SendBuf.nOffset = 0;
}
else
{
pSession->boSendAvaliable = false;
pSession->dwSendTimeOut = _getTickCount() + RUNGATE_SENDCHECK_TIMEOUT;
pSendThread->boSendEWouldBlock = true;
}
}
UNLOCK_SESSION_SEND( pSession );
}
}
else
{
//会话关闭后减少待发送数据统计值
if ( (nRemainSize = pSession->SendBuf.nOffset) )
{
InterlockedExchangeAdd( (LONG*)&m_dwWaitSendUserSize, -nRemainSize );
pSession->SendBuf.nOffset = 0;
}
}
}
}
5、接收线程关闭接收出错的会话
INT CEpollRunSockProcesser::EpollSessions()
{
const int RecvBufSize = 1024;
char sRecvBuf[RecvBufSize];
PRUNGATEUSERSESSION pSession;
int nRecv, nFdSSCount=0;
INT_PTR nSessionCount = m_pExecSockDataMgr->GetSessionCount();
//监听接收事件
nFdSSCount = ::epoll_wait(m_kdpfdr, &epfds[0], nSessionCount, 0);
if ( nFdSSCount > 0 )
{
while ( nFdSSCount > 0 )
{
nFdSSCount--;
pSession = (PRUNGATEUSERSESSION)epfds[nFdSSCount].data.ptr;
if (pSession->boMarkToClose || pSession->boRemoteClosed)//不会接收关闭了的会话的数据
{
//套接口被标志关闭了
pSession->boRemoteClosed = true;
delEpoll(pSession->nSocket,EPOLLIN | EPOLLERR | EPOLLPRI);
m_pExecSockDataMgr->CloseSession( pSession );
}
else if (epfds[nFdSSCount].events & (EPOLLERR | EPOLLPRI))
{
//套接口出现错误
pSession->boRemoteClosed = true;//接受数据线程关闭会话
delEpoll(pSession->nSocket,EPOLLIN | EPOLLERR | EPOLLPRI);
m_pExecSockDataMgr->CloseSession( pSession );
}
else
{
if (epfds[nFdSSCount].events & EPOLLIN)
{
//套接口准备好了读取操作
nRecv = ::recv( pSession->nSocket, sRecvBuf, RecvBufSize, 0 );
if ( nRecv > 0 )
{
//添加到数据包
m_pExecSockDataMgr->AddRecvBuf( pSession, pSession->nVerifyIdx, sRecvBuf, nRecv );
}
else
{
pSession->boRemoteClosed = true;
delEpoll(pSession->nSocket,EPOLLIN | EPOLLERR | EPOLLPRI);
m_pExecSockDataMgr->CloseSession( pSession );
}
}
}
epfds[nFdSSCount].events=0;//清除事件
}
}
return nSessionCount;
}