主函数基本流程为
初始化epoll树,边缘触发模式。
初始化线程池
设置非阻塞IO
然后进入事件循环体函数
int main()
{
handle_for_sigpipe();
int epoll_fd = epoll_init();
if (epoll_fd < 0)
{
perror("epoll init failed");
return 1;
}
threadpool_t *threadpool = threadpool_create(THREADPOOL_THREAD_NUM, QUEUE_SIZE, 0);
int listen_fd = socket_bind_listen(PORT);
if (listen_fd < 0)
{
perror("socket bind failed");
return 1;
}
if (setSocketNonBlocking(listen_fd) < 0)
{
perror("set socket non block failed");
return 1;
}
__uint32_t event = EPOLLIN | EPOLLET;
requestData *req = new requestData();
req->setFd(listen_fd);
epoll_add(epoll_fd, listen_fd, static_cast<void*>(req), event);
while (true)
{
int events_num = my_epoll_wait(epoll_fd, events, MAXEVENTS, -1);
//printf("%zu\n", myTimerQueue.size());
if (events_num == 0)
continue;
printf("%d\n", events_num);
//printf("%zu\n", myTimerQueue.size());
// else
// cout << "one connection has come!" << endl;
// 遍历events数组,根据监听种类及描述符类型分发操作
handle_events(epoll_fd, listen_fd, events, events_num, PATH, threadpool);
handle_expired_event();
}
return 0;
}先看两个函数,handle_events(...)函数
通过epoll机制直接将任务塞给线程池。
// 分发处理函数
void handle_events(int epoll_fd, int listen_fd, struct epoll_event* events, int events_num, const string &path, threadpool_t* tp)
{
for(int i = 0; i < events_num; i++)
{
// 获取有事件产生的描述符
requestData* request = (requestData*)(events[i].data.ptr);
int fd = request->getFd();
// 有事件发生的描述符为监听描述符
if(fd == listen_fd)
{
//cout << "This is listen_fd" << endl;
acceptConnection(listen_fd, epoll_fd, path);
}
else
{
// 排除错误事件
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)
|| (!(events[i].events & EPOLLIN)))
{
printf("error event\n");
delete request;
continue;
}
// 将请求任务加入到线程池中
// 加入线程池之前将Timer和request分离
request->seperateTimer();
int rc = threadpool_add(tp, myHandler, events[i].data.ptr, 0);
}
}
}
这个函数是通过堆来处理超时连接的
void handle_expired_event()
{
pthread_mutex_lock(&qlock);
while (!myTimerQueue.empty())
{
mytimer *ptimer_now = myTimerQueue.top();
if (ptimer_now->isDeleted())
{
myTimerQueue.pop();
delete ptimer_now;
}
else if (ptimer_now->isvalid() == false)
{
myTimerQueue.pop();
delete ptimer_now;
}
else
{
break;
}
}
pthread_mutex_unlock(&qlock);
}主函数中使用了几个全局变量如下‘
const int THREADPOOL_THREAD_NUM = 4;
const int QUEUE_SIZE = 65535;
const int PORT = 8888;
const int ASK_STATIC_FILE = 1;
const int ASK_IMAGE_STITCH = 2;
const string PATH = "/";
const int TIMER_TIME_OUT = 500;
extern pthread_mutex_t qlock;
extern struct epoll_event* events;
void acceptConnection(int listen_fd, int epoll_fd, const string &path);
extern priority_queue<mytimer*, deque<mytimer*>, timerCmp> myTimerQueue;再简单看看接收连接函数
void acceptConnection(int listen_fd, int epoll_fd, const string &path)
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_addr_len = 0;
int accept_fd = 0;
while((accept_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len)) > 0)
{
/*
// TCP的保活机制默认是关闭的
int optval = 0;
socklen_t len_optval = 4;
getsockopt(accept_fd, SOL_SOCKET, SO_KEEPALIVE, &optval, &len_optval);
cout << "optval ==" << optval << endl;
*/
// 设为非阻塞模式
int ret = setSocketNonBlocking(accept_fd);
if (ret < 0)
{
perror("Set non block failed!");
return;
}
requestData *req_info = new requestData(epoll_fd, accept_fd, path);
// 文件描述符可以读,边缘触发(Edge Triggered)模式,保证一个socket连接在任一时刻只被一个线程处理
__uint32_t _epo_event = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_add(epoll_fd, accept_fd, static_cast<void*>(req_info), _epo_event);
// 新增时间信息
mytimer *mtimer = new mytimer(req_info, TIMER_TIME_OUT);
req_info->addTimer(mtimer);
pthread_mutex_lock(&qlock);
myTimerQueue.push(mtimer);
pthread_mutex_unlock(&qlock);
}
//if(accept_fd == -1)
// perror("accept");
}
版权声明:本文为wwxy1995原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。