第15章 进程池和线程池

15.1 进程池和线程池概述

        进程池和线程池相似,进程池是由服务器预先创建的一组子进程,这些子进程的数据在3-10个之间。线程池中的线程数量应该和CPU数量差不多。

        进程池中的所有子进程都运行着相同的代码,并具有相同的属性。因为进程池在服务启动之初就创建好了,所以每个子进程都没有打开不必要的文件描述符,也不会错误地使用大块的堆内存。

        当有新的任务到来时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价要小得多。至于主进程选择哪个子进程来为新任务服务,有以下两种方式:

(1)主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和Round Robin(轮流算法),但更优秀、更智能的算法将使任务在各个工作进程中更均匀地分配,从而减轻服务器的整体压力。

(2)主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。

        当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方法是,在父进程和子进程之间预先建立好一条管道,然后通过该管道来实现所有的进程间通信(要预先定义好一套协议来规范管道的使用)。在父线程和子线程之间传递数据就要简单的多,因为我们可以把这些数据定义为全局的,那么它们本身就是被所有线程共享的。

        综上,进程池的一般模型描绘为下图形式:

15.2 处理多客户

        在使用线程池处理多客户任务时,首先要要考虑的一个问题是:监听socket和连接socket是否都由主进程来统一管理。其中:

        半同步/半反应堆模式由主进程统一管理这两种socket的;

        高效的半同步/半异步模式和领导者/追随者模式,则是主进程管理监听socket,子进程管理属于自己的连接socket。

        在设计进程池时还要考虑如下:

        当客户任务是无状态的,我们可以使用不同的子进程来为该客户的不同请求服务。

        当客户任务存在上下文关系,则最好一直使用同一个子进程来为之服务。在前面我们讨论了epoll的EPOLLONESHOT事件,能够确保一个客户连接在整个生命周期中仅被一个线程处理。

15.3 半同步/半异步进程池实现

        下面我们实现一个半同步/半异步并发模式的进程池。为了避免在父、子进程之间传递文件描述符,我们将接受新连接的操作放到子进程中。显然对于这种模式,一个客户连接上的所有任务始终是由一个子进程来处理的。

        该进程池内部实现详细图如下:

#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H
 
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>
 
//描述一个子进程的类,m_pid是目标子进程的PID,m_pipefd是父进程和子进程通信用的管道
class process
{
public:
    process() : m_pid(-1){}
public:
    pid_t m_pid;
    int m_pipefd[2];
};
 
//进程池类,将它定义为模板类是为了代码复用。其模板参数是处理逻辑任务的类
template<typename T>
class processpool
{
private:
    //将构造函数定义为私有的,因此我们只能通过后面的create静态函数来创建processpool实例
    processpool(int listenfd, int process_number = 8);
public:
    //单例模式,以保证程序最多创建一个processpool实例,这是程序正确处理信号的必要条件
    static processpool<T>* create(int listenfd, int process_number = 8)
    {
        if (!m_instance)
        {
            m_instance = new processpool<T>(listenfd, process_number);
        }
        return m_instance;
    }
    ~processpool()
    {
        delete [] m_sub_process;
    }
    //启动进程池
    void run();
private:
    void setup_sig_pipe();
    void run_parent();
    void run_child();
private:
    //进程池允许最大子进程数量
    static const int MAX_PROCESS_NUMBER = 16;
    //每个子进程最多能处理得客户数量
    static const int USER_PER_PROCESS = 65536;
    //epoll最多能处理的事件数
    static const int MAX_EVENT_NUMBER = 10000;
    //进程池中的进程总数
    int m_process_number;
    //子进程在池中的序号,从0开始
    int m_idx;
    //每个进程都有一个epoll内核事件表,用m_epollfd标识
    int m_epollfd;
    //监听socket
    int m_listenfd;
    //子进程通过m_stop来决定是否停止运行
    int m_stop;
    //保存所有子进程的描述信息
    process* m_sub_process;
    //进程池静态实例
    static processpool<T>* m_instance;
};
 
template<typename T>
processpool<T>* processpool<T>::m_instance = NULL;
//用于处理信号的管道,以实现统一事件源,后面称之为信号通道
static int sig_pipefd[2];
 
static void setnonblocking(int fd)
{
    int flag = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flag|O_NONBLOCK);
}
 
static void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}
 
//从epollfd标识的epoll内核事件表中删除fd上的所有注册事件
static void removefd(int epollfd, int fd)
{
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
    close(fd);
}
 
static void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(sig_pipefd[1], (char*)&msg, 1, 0);
    errno = save_errno;
}
 
static void addsig(int sig, void(handler)(int), bool restart = true)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = handler;
    if (restart)
    {
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig, &sa, NULL) != -1);
}
 
/*进程池构造函数。参数listenfd是监听socket,它必须在创建进程池之前被创建,否则子进程无法直接引用它。
参数process_number指定进程池子中子进程的数量
*/
template<typename T>
processpool<T>::processpool(int listenfd, int process_number)
    :m_listenfd(listenfd), m_process_number(process_number), m_idx(-1),
    m_stop(false)
{
    assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));
    m_sub_process = new process[process_number];
    assert(m_sub_process);
 
    //创建process_number个子进程,并建立它们和父进程之间的管道
    for(int i=0; i<process_number; i++)
    {
        int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);
        assert(ret == 0);
 
        m_sub_process[i].m_pid = fork();
        assert(m_sub_process[i].m_pid >= 0);
        //父进程使用m_pipefd[0],子进程使用m_pipefd[1]
        if (m_sub_process[i].m_pid > 0)     //父进程
        {
            close(m_sub_process[i].m_pipefd[1]);
            continue;
        }
        else            //子进程
        {
            close(m_sub_process[i].m_pipefd[0]);
            m_idx = i;
            break;
        }
    }
}
 
//统一事件源
template<typename T>
void processpool<T>::setup_sig_pipe()
{
    //创建epoll事件监听表和信号管道
    m_epollfd = epoll_create(5);
    assert(m_epollfd != -1);
 
    //创建管道
    /*sockpair函数创建的管道是全双工的,不区分读写端
    此处我们假设sig_pipefd[1]为写端,非阻塞,sig_pipefd[0]为读端
    */
    int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
    assert(ret != -1);
 
    setnonblocking(sig_pipefd[1]);
    addfd(m_epollfd, sig_pipefd[0]);    //注册sig_pipefd[0]上的可读事件
 
    //设置信号处理函数
    addsig(SIGCHLD, sig_handler);
    addsig(SIGTERM, sig_handler);
    addsig(SIGINT, sig_handler);
    addsig(SIGPIPE, SIG_IGN);
}
 
//父进程中m_idx=-1,子进程中m_idx>=0。据此判断接下来要运行的是父进程代码还是子进程代码
template<typename T>
void processpool<T>::run()
{
    if (m_idx != -1)
    {
        run_child();
        return;
    }
    run_parent();
}
 
template<typename T>
void processpool<T>::run_child()
{
    setup_sig_pipe();
    
    //每个子进程都通过其在进程池中的序号值m_idx找到与父进程通信的管道
    int pipefd = m_sub_process[m_idx].m_pipefd[1];
    //子进程需要监听管道文件描述符pipefd,因为父进程将通过它来通知子进程accept新连接
    addfd(m_epollfd, pipefd);
    
    epoll_event events[MAX_EVENT_NUMBER];
    T* users = new T[USER_PER_PROCESS];
    assert(users);
    int number = 0;
    int ret = -1;
 
    while(!m_stop)
    {
        number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
        //epoll_wait失败并且不是接收到信号导致的
        if ((number < 0) && (errno != EINTR))
        {
            printf("epoll failure\n");
            break;
        }
        
        for (int i = 0; i<number; i++)
        {
            int sockfd = events[i].data.fd; //获取文件描述符
            if ((sockfd == pipefd) && (events[i].events & EPOLLIN))
            {
                int client = 0;
                //从父、子进程之间的管道读取数据,并将结果保存在变量client中。
                //如果读取成功,则表示有新客户连接到来
                ret = recv(sockfd, (char*)&client, sizeof(client), 0);
                //接受数据失败或接受数据为空
                if (((ret<0) && (errno!=EAGAIN)) || ret==0)
                {
                    continue;
                }
                else
                {
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof(client_address);
                    int connfd = accept(m_listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                    if (connfd < 0)
                    {
                        printf("errno is:%d\n", errno);
                        continue;
                    }
                    addfd(m_epollfd, connfd);
                    //模板类T必须实现init方法,以初始化一个客户链接。
                    //我们直接使用connfd来索引逻辑处理对象(T类型对象),以提高程序效率
                    users[connfd].init(m_epollfd, connfd, client_address);
                }
            }
            //下面处理子进程接收到的信号
            else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
            {
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if(ret <= 0)
                {
                    continue;
                }
                else
                {
                    for (int i=0; i<ret; i++)
                    {
                        switch(signals[i])
                        {
                            case SIGCHLD:   //回收子进程
                            {
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNONHANG)) > 0)
                                {
                                    continue;
                                }
                            }
                            case SIGTERM:
                            case SIGINT:    //键盘输入ctrl+c中断进程
                            {
                                m_stop = true;
                                break;
                            }
                            default:
                                break; 
                        }
                    }
                }
            }
            //如果是其他可读数据,那么必然是客户端发来的其他数据。调用逻辑处理对象的process方法处理。
            else if (events[i].events & EPOLLIN)
            {
                users[sockfd].process();
            }
            else
            {
                continue;
            }
        }
    }
    delete []users;
    users = NULL;
    close(pipefd);
    //我们将下面一条语句注释掉,因为:应该有m_listenfd的创建者来关闭这个文件描述符,即所谓的
    //"对象(如一个文件描述符,又或一段堆内存)由哪个函数创建,就由哪个函数销毁"
    //close(m_listenfd);
    close(m_epollfd);
}
 
 template<typename T>
 void processpool<T>::run_parent()
 {
    setup_sig_pipe();
    //父进程监听m_listenfd
    addfd(m_epollfd, m_listenfd);

    epoll_event events[MAX_EVENT_NUMBER];
    int sub_process_counter = 0;
    int new_conn = 1;
    int number = 0;
    int ret = -1;
    while(!m_stop)
    {
        number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
        if ((number < 0)) && (errno != EINTR)
        {
            printf("epoll failuer\n");
            break;
        }
        for (int i = 0; i<number; i++)
        {
            int sockfd = events[i].data.fd;
            if (sockfd == m_listenfd)
            {
                //如果有新连接到来,就采用轮流的方式将其分配给一个子进程处理
                int i = sub_process_counter;
                //寻找被fork出来的子进程
                do
                {
                    //此时的m_sub_process[i].m_pid是fork()后子进程的pid,如果其值为-1,说明m_sub_process[i]还没被fork(),
                    //接下来父进程通过m_sub_process[i].m_pipefd[0]向对应的子进程发送通知,从而实现父进程调用其子进程接受连接
                    if (m_sub_process[i].m_pid != -1)
                    {
                        break;
                    }
                    i = (i + 1) % m_process_number;
                }while(i != sub_process_counter);
                //未找到被fork出来的子进程
                if (m_sub_process[i].m_pid == -1)
                {
                    m_stop = true;
                    break;
                }
            }
            //下面处理父进程接收到的信号
            else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
            {
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if (ret <= 0)
                {
                    continue;
                }
                else
                {
                    for (int i=0; i<ret; i++)
                    {
                        switch(signals[i])
                        {
                            case SIGCHLD:   //子进程退出
                            {
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNONHANG)) > 0)
                                {
                                    for (int i=0; i<m_process_number; i++)
                                    {
    //如果进程池中第i个子进程退出了,则主进程关闭对应的通信管道,并设置相应的m_pid为-1,以标记该子进程已经退出
                                        if (m_sub_process[i].m_pid == pid)
                                        {
                                            printf("child %d join\n", i);
                                            close(m_sub_process[i].m_pipefd[0]);
                                            m_sub_process[i].m_pid = -1;
                                        }
                                    }
                                }
                            //如果所有子进程都已经退出了,则父进程也退出
                                m_stop =  true;
                                for(int i=0; i<m_process_number; i++)
                                {
                                    //如果有子进程的pid不为-1,说明还有没退出的子进程
                                    if (m_sub_process[i].m_pid != -1)
                                    {
                                        m_stop = false;
                                    }
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:    //用户按下中断键(Delete或Ctrl+C)
                            {
        //如果父进程接受到终止信号,那么就杀死所有的子进程,并等待它们全部结束。当然,通知子进程结束更好的
        //方法是向父、子进程之间的通信管道发送特殊数据
                                printf("kill all the child now\n");
                                for (int i=0; i<m_process_number; i++)
                                {
                                    int pid = m_sub_process[i].m_pid;
                                    if (pid != -1)
                                    {
                                        kill(pid, SIGTERM); //发送终止进程的命令
                                    }
                                }
                                break;
                            }
                            default:
                            {
                                break;
                            }
                        }
                    }
                }
                else 
                {
                    continue;
                }
            }

        }
    }
    //close(m_listenfd); /*由创建者关闭这个文件描述符*/
    close(m_epollfd);
 }
 
#endif

15.4 半同步/半反应堆线程池实现

        相比于进程池实现,该线程池的通用性要高很多,因为它使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。不过,如果要将该线程池应用到实际服务器程序中,我们必须要保证所有客户请求都是无状态的,因为同一个连接上的不同请求可能会由不同的线程处理。

locker.h

#ifndef LOCKER_H
#define LOCKER_H

#include <exception>
#include <pthread.h>
#include <semaphore.h>

//封装信号量的类
class sem
{
public:
    //创建并初始化信号量
    sem()
    {
        if (sem_init(&m_sem, 0, 0) != 0)
        {
            //构造函数没有返回值,可以通过抛出异常来报告错误
            throw std::exception();
        }
    }
    //销毁信号量
    ~sem()
    {
        sem_destroy(&m_sem);
    }
    //等待信号量,将m_sem值减1
    bool wait()
    {
        return sem_wait(&m_sem) == 0;
    }
    //增加信号量,将信号量的值m_sem加1
    bool post()
    {
        return sem_post(&m_sem) == 0;
    }
private:
    sem_t m_sem;
};

//封装互斥锁类
class locker
{
public:
    //创建并初始化互斥锁
    locker()
    {
        if (pthread_mutex_init(&m_mutex, NULL) != 0)
        {
            throw std::exception();
        }
    }
    //销毁互斥锁
    ~locker()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    //获取互斥锁
    bool lock()
    {
        return pthread_mutex_lock(&m_mutex) == 0;
    }
    //释放互斥锁
    bool unlock()
    {
        return pthread_mutex_unlock(&m_mutex) == 0;
    }
private:
    pthread_mutex_t m_mutex;
};

//封装条件变量的类
class cond
{
public:
    //创建并初始化条件变量
    cond()
    {
        if (pthread_mutex_init(&m_mutex, NULL) != 0)
        {
            throw std::exception();
        }
        if (pthread_cond_init(&m_cond, NULL) != 0)
        {
            //构造函数中一旦出现问题,就应该立即释放已经成功分配了的资源
            pthread_mutex_destroy(&m_mutex);
            throw std::exception();
        }
    }
    //销毁条件变量
    ~cond()
    {
        pthread_mutex_destroy(&m_mutex);
        pthread_cond_destroy(&m_cond);
    }
    //等待条件变量
    bool wait()
    {
        int ret = 0;
        pthread_mutex_lock(&m_mutex);
        ret = pthread_cond_wait(&m_cond, &m_mutex);
        pthread_mutex_unlock(&m_mutex);
        return ret = 0;
    }
    //唤醒等待条件变量的线程
    bool signal()
    {
        return pthread_cond_signal(&m_cond) == 0;
    }
private:
    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
};

#endif

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>

template<typename T>
class threadpool
{
public:
    //参数thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理请求的数量
    threadpool(int thread_number=8, int max_requests=10000);
    ~threadpool();
    //往请求队列中添加任务
    bool append(T* request);
private:
    //工作线程运行的函数,它不断从工作队列中取出任务并执行之
    static void* worker(void* arg);
    void run();
private:
    int m_thread_number;    //线程池中的线程数
    int m_max_requests;     //请求队列中允许的最大请求数
    pthread_t* m_threads;   //描述线程池的数组,其大小为m_thread_number;
    std::list<T*> m_workqueue;  //请求队列
    locker m_queuelocker;   //封装类locker对象,保护请求队列的互斥锁
    sem m_queuestat;        //封装类sem对象,是否有任务需要处理
    bool m_stop;            //是否结束线程
};

template<typename T>
threadpool<T>::threadpool(int thread_number, int max_requests):
    m_thread_number(thread_number), m_max_requests(max_requests),
    m_stop(false), m_threads(NULL)
{
    if ((thread_number <= 0) || (max_requests <= 0))
    {
        throw std::exception();
    }
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
    {
        throw std::exception();
    }
    //创建thread_number个线程,并将他们都设置为脱离线程
    for (int i=0; i<thread_number; i++)
    {
        printf("create the %dth thread\n", i);
        if (pthread_create(m_threads + i, NULL, worker, this) != 0)
        {
            delete [] m_threads;
            throw std::exception();
        }
        if (pthread_detach(m_threads[i]) != 0)
        {
            delete [] m_threads;
            throw std::exception();
        }
    }
}

template<typename T>
threadpool<T>::~threadpool()
{
    delete [] m_threads;
    m_stop = true;
}

template<typename T>
bool threadpool<T>::append(T* request)
{
    //操作工作队列时一定要加锁,因为它被所有线程共享
    m_queuelocker.lock();
    //若请求队列的大小大于请求队列允许的最大请求数,则不再添加请求
    if (m_workqueue.size() > m_max_requests)
    {
        m_queuelocker.unlock();
        return false;
    }
    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();     //信号量的值加1,相当于生产者生产
    return true;
}

template<typename T>
void* threadpool<T>::worker(void* arg)
{
    threadpool* pool = (threadpool*) arg;
    pool->run();
    return pool;
}

template<typename T>
void threadpool<T>::run()
{
    while(!m_stop)
    {
        m_queuestat.wait(); //信号量值减1,相当于消费者消费
        m_queuelocker.lock();   //从任务队列中取出任务前先加锁
        if (m_workqueue.empty())
        {
            m_queuelocker.unlock();
            continue;
        }
        T* request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();    //从任务队列中取出任务后解锁
        if (!request)
        {
            continue;
        }
        request->process(); //指定任务
    }
}

#endif

        需要注意的是,在C++程序中使用pthread_create函数是,该函数的第3个参数必须是指向一个静态函数。而要在一个静态函数中使用类的动态成员(包括成员函数和成员变量),则只能通过如下两种方式实现:

(1)通过类的静态对象来调用。比如单例模式中,静态函数可以通过类的全局唯一实例来访问动态成员函数。

(2)将类的对象作为参数传递给该静态函数,然后在静态函数中引用这个对象,并调用其动态方法。

代码threadpool.h中的worker()函数中用的是第2种方式:将线程参数设置为this指针,然后在woker函数种获取该指针并调用其动态方法run。

15.5 用线程池实现简单的web服务器

        在第八章种,我们曾使用有限状态机实现过一个非常简单的解析HTTP请求的服务器。下面我们将利用前面介绍的线程池来重新实现一个并发的web服务器。

15.5.1 http_conn类

        首先,我们需要准备线程池的模板参数类,用以封装对逻辑任务的处理。这个类是http_conn,下面的代码时其头文件http_conn.h和其实现文件http_conn.cpp.

http_conn.h

#ifndef HTTPCONNECTION_H
#define HTTPCONNECTION_H

#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <errno.h>
#include <errno.h>
#include "locker.h"

class http_conn
{
public:
    //文件名的最大长度
    static const int FILENAME_LEN = 200;
    //读缓冲区大小
    static const int READ_BUFFER_SIZE = 2048;
    //写缓冲区大小
    static const int WRITE_BUFFER_SIZE = 1024;
    //HTTP请求方法,但我们仅支持GET
    enum METHOD{GET=0, POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT, PATHC};
    //解析客户请求时,主状态机所处的状态
    enum CHECK_STATE{CHECK_STATE_REQUESTLINE = 0,
                    CHECK_STATE_HEADER, CHECK_STATE_CONTENT};
    //服务器处理HTTP请求的可能结果
    enum HTTP_CODE{NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST,
                    FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION};
    //行的读取状态
    enum LINE_STATUS{LINE_OK = 0, LINE_BAD, LINE_OPEN};
public:
    http_conn(){}
    ~http_conn(){}
public:
    //初始化新接受的连接
    void init(int sockfd, const sockaddr_in& addr);
    //关闭连接
    void close_conn(bool real_close = true);
    //处理客户请求
    void process();
    //非阻塞读操作
    bool read();
    //非阻塞写操作
    bool write();
private:
    //初始化连接
    void init();
    //解析HTTP请求
    HTTP_CODE process_read();
    //填充HTTP应答
    bool process_write(HTTP_CODE ret);

    //下面这一组函数被process_read调用以分析HTTP请求
    HTTP_CODE parse_request_line(char* text);
    HTTP_CODE parse_headers(char* text);
    HTTP_CODE parase_content(char* text);
    HTTP_CODE do_request();
    char* get_line(){return m_read_buf + m_start_line;}
    LINE_STATUS parse_line();

    //下面这一组函数被process_write调用以填充HTTP应答
    void unmap();
    bool add_response(const char* format, ...);
    bool add_content(const char* content);
    bool add_status_line(int status, const char* title);
    bool add_headers(int content_length);
    bool add_content_length(int content_length);
    bool add_linger();
    bool add_blank_line();
public:
    //所有socket上的事件都被注册到同一个epoll内核事件表种,所以将epoll文件描述符设置为静态的
    static int m_epollfd;
    //统计用户数量
    static int m_user_count;
private:
    //读HTTP连接的socket和对方的socket地址
    int m_sockfd;
    sockaddr_in m_address;
    //读缓冲区
    char m_read_buf[READ_BUFFER_SIZE];
    //标识读缓冲区中已经读入的客户数据的最后一个字节的下一个位置
    int m_read_idx;
    //当前正在分析的字符在读缓冲区中的位置
    int m_checked_idx;
    //当前正在解析的行的起始位置
    int m_start_line;
    //写缓冲区
    char m_write_buf[WRITE_BUFFER_SIZE];
    //写缓冲区中待发送的字节数
    int m_write_idx;

    //主状态机当前所处的状态
    CHECK_STATE m_check_state;
    //请求方法
    METHOD m_method;

    //客户请求的目标文件的完整路径,其内容等于doc_root+m_url,doc_root是网站根目录
    char m_real_file[FILENAME_LEN];
    //客户请求的目标文件的文件名
    char* m_url;
    //HTTP协议版本号,我们仅支持HTTP/1.1
    char* m_version;
    //主机名
    char* m_host;
    //HTTP请求得消息体得长度
    int m_content_length;
    //HTTP请求是否要求保持连接
    bool m_linger;

    //客户请求的目标文件被mmap到内存中的起始位置
    char* m_file_address;
    //目标文件的状态。通过它我们可以判断文件是否存在、是否为目录、是否可读,并获取文件大小等信息
    struct stat m_file_stat;
    //我们将采用writev来执行写操作,所以定义下面两个成员,其中m_iv_count表示写内存快的数量
    struct iovec m_iv[2];
    int m_iv_count;
};

#endif

http_conn.cpp

#include "http_conn.h"

//定义HTTP响应的一些状态信息
const char* ok_200_title = "OK";
const char* error_400_title = "Bad Request";
const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
const char* error_403_title = "Forbidden";
const char* error_403_form = "You do not have permission to get file from this server.\n";
const char* error_404_title = "Not Found";
const char* error_404_form = "The requestd file was not found on this server.\n";
const char* error_500_title = "internal Error";
const char* error_500_form = "There was an unusual proble serving the requested file.\n";
//网站的根目录
const char* doc_root = "/var/www/html";

int setnonblocking(int fd)
{
    int flag = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flag|O_NONBLOCK);
}

void addfd(int epollfd, int fd, bool one_shot)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
    if (one_shot)   //EPOLLONESHOT可使该fd在任意时刻都只能被一个线程处理
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

//从epollfd标识的epoll内核事件表中删除fd上的所有注册事件
static void removefd(int epollfd, int fd)
{
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
    close(fd);
}

//修改fd上注册的事件
void modfd(int epollfd, int fd, int ev)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

int http_conn::m_user_count = 0;
int http_conn::m_epollfd = -1;

void http_conn::close_conn(bool real_close)
{
    if (real_close && (m_sockfd != -1))
    {
        removefd(m_epollfd, m_sockfd);
        m_sockfd = -1;
        m_user_count--; //关闭一个连接时,将客户总量减1
    }
}

void http_conn::init(int sockfd, const sockaddr_in& addr)
{
    m_sockfd = sockfd;
    m_address = addr;
    //如下两个是为了避免TIME_WAIT状态,仅用于调试,实际使用时应该去掉
    //int reuse = 1;
    //setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
    addfd(m_epollfd, sockfd, true);
    m_user_count++;

    init();
}

void http_conn::init()
{
    m_check_state = CHECK_STATE_REQUESTLINE;
    m_linger = false;

    m_method = GET;
    m_url = 0;
    m_version = 0;
    m_content_length = 0;
    m_host = 0;
    m_start_line = 0;
    m_checked_idx = 0;
    m_read_idx = 0;
    m_write_idx = 0;
    memset(m_read_buf, '\0', READ_BUFFER_SIZE);
    memset(m_write_buf, '\0', WRITE_BUFFER_SIZE);
    memset(m_real_file, '\0', FILENAME_LEN);
}

//从状态机
http_conn::LINE_STATUS http_conn::parse_line()
{
    char temp;
    for(; m_checked_idx < m_read_idx; m_checked_idx++)
    {
        temp = m_read_buf[m_checked_idx];
        if (temp == '\r')
        {
            if ((m_checked_idx + 1) == m_read_idx)
            {
                return LINE_OPEN;
            }
            else if (m_read_buf[m_checked_idx + 1] == '\n')
            {
                m_read_buf[m_checked_idx++] = '\0';
                m_read_buf[m_checked_idx++] = '\0';
                return LINE_OK;
            }
        }
        else if (temp == '\n')
        {
            if ((m_checked_idx > 1) && (m_read_buf[m_checked_idx-1] =='\r'))
            {
                m_read_buf[m_checked_idx-1] = '\0';
                m_read_buf[m_checked_idx++] = '\0';
                return LINE_OK;
            }
            return LINE_BAD;
        }
    }
    return LINE_OPEN;   //表示还要继续读取客户数据
}

//循环读取客户数据,直到无数据可读或者对方关闭连接
bool http_conn::read()
{
    if (m_read_idx >= READ_BUFFER_SIZE)
    {
        return false;
    }

    int bytes_read = 0;
    while(true)
    {
        bytes_read = recv(m_sockfd, m_read_buf+m_read_idx, READ_BUFFER_SIZE-m_read_idx, 0);
        if (bytes_read == -1)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                break;
            }
            return false;
        }
        else if (bytes_read == 0)
        {
            return false;
        }

        m_read_idx += bytes_read;
    }
    return true;
}

//解析HTTP请求行,获得请求方法、目标url,以及HTTP版本号
http_conn::HTTP_CODE http_conn::parse_request_line(char* text)
{
    m_url = strpbrk(text, " \t");
    if (!m_url)
    {
        return BAD_REQUEST;
    }
    *m_url++='\0';
    char* method = text;
    if (strcasecmp(method, "GET")==0)
    {
        m_method = GET;
    }
    else
    {
        return BAD_REQUEST;
    }
    m_url += strspn(m_url, " \t");
    m_version = strpbrk(m_url, " \t");
    if (m_version)
    {
        return BAD_REQUEST;
    }
    *m_version++ = '\0';
    m_version += strspn(m_version, " \t");
    if (strcasecmp(m_version, "HTTP/1.1") != 0)
    {
        return BAD_REQUEST;
    }
    if (strncasecmp(m_url, "http://", 7) != 0)
    {
        m_url += 7;
        m_url = strchr(m_url, '/');
    }
    if (!m_url || m_url[0] != '/')
    {
        return BAD_REQUEST;
    }

    m_check_state = CHECK_STATE_HEADER;
    return NO_REQUEST;
}

//解析HTTP请求的一个头部信息
http_conn::HTTP_CODE http_conn::parse_headers(char* text)
{
    //遇到空行,表示头部字段解析完毕
    if(text[0] == '\0')
    {
        if (m_content_length != 0)
        {
            m_check_state = CHECK_STATE_CONTENT;
            return NO_REQUEST;
        }
        return GET_REQUEST;
    }
    //处理connection头部字段
    else if (strncasecmp(text, "Connection:", 11) == 0)
    {
        text += 11;
        text += strspn(text, " \t");
        if (strcasecmp(text, "keep-alive") == 0)
        {
            m_linger = true;
        }
    }
    //处理Content-Length头部字段
    else if (strncasecmp(text, "Content-Length:", 15) == 0)
    {
        text += 15;
        text += strspn(text, " \t");
        m_content_length = atol(text);
    }
    //处理Host头部字段
    else if (strncasecmp(text, "Host:", 5) == 0)
    {
        text += 5;
        text += strspn(text, " \t");
        m_host = text;
    }
    else
    {
        printf("oop! unknow header %s\n", text);
    }
    return NO_REQUEST;
}

//我们没有真正解析HTTP请求的消息体,只是判断它是否被完整地读入了
http_conn::HTTP_CODE http_conn::parase_content(char* text)
{
    if(m_read_idx >= (m_content_length + m_checked_idx))
    {
        text[m_content_length] = '\0';
        return GET_REQUEST;
    }
    return NO_REQUEST;
}

//主状态机
http_conn::HTTP_CODE http_conn::process_read()
{
    LINE_STATUS line_status = LINE_OK;
    HTTP_CODE ret = NO_REQUEST;
    char* text = 0;
    while (((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK))
        || ((line_status = parse_line()) == LINE_OK))
    {
        text = get_line();
        m_start_line = m_checked_idx;
        printf("got 1 http line: %s\n", text);

        switch(m_check_state)
        {
            case CHECK_STATE_REQUESTLINE:
            {
                ret = parse_request_line(text);
                if (ret == BAD_REQUEST)
                {
                    return BAD_REQUEST;
                }
                break;
            }
            case CHECK_STATE_HEADER:
            {
                ret = parse_headers(text);
                if(ret == BAD_REQUEST)
                {
                    return BAD_REQUEST;
                }
                else if (ret == GET_REQUEST)
                {
                    return do_request();
                }
                break;
            }
            case CHECK_STATE_CONTENT:
            {
                ret = parase_content(text);
                if (ret == GET_REQUEST)
                {
                    return do_request();
                }
                line_status = LINE_OPEN;
                break;
            }
            default:
            {
                return INTERNAL_ERROR;
            }
        }
    }
    return NO_REQUEST;
}

/*当得到一个完整正确的HTTP请求时,我们就分析目标文件的属性。如果目标文件存在,对所有用户可读,
且不是目录,则使用mmap将其映射到内存地址m_file_address处,并告诉调用着获取文件成功
*/
http_conn::HTTP_CODE http_conn::do_request()
{
    strcpy(m_real_file, doc_root);
    int len = strlen(doc_root);
    strncpy(m_real_file + len, m_url, FILENAME_LEN - len -1);
    if (stat(m_real_file, &m_file_stat) < 0)
    {
        return NO_REQUEST;
    }

    if (!(m_file_stat.st_mode & S_IROTH))
    {
        return FORBIDDEN_REQUEST;
    }
    if (S_ISDIR(m_file_stat.st_mode))
    {
        return BAD_REQUEST;
    }
    int fd = open(m_real_file, O_RDONLY);
    m_file_address = (char*)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
    close(fd);
    return FILE_REQUEST;
}

//对内存映射区执行munmap操作,即取消内存映射
void http_conn::unmap()
{
    if (m_file_address)
    {
        munmap(m_file_address, m_file_stat.st_size);
        m_file_address = 0;
    }
}

//写HTTP响应
bool http_conn::write()
{
    int temp = 0;
    int bytes_have_send = 0;    //已发送的数据长度
    int bytes_to_send = m_write_idx;    //待发送的数据长度
    if (bytes_to_send == 0)
    {
        modfd(m_epollfd, m_sockfd, EPOLLIN);
        init();
        return true;
    }
    while(1)
    {
        temp = writev(m_sockfd, m_iv, m_iv_count);
        if (temp <= -1)
        {
            /*
            如果TCP写缓冲没有空间,则等待下一轮EPOLLOUT事件。虽然在此期间,服务器无法立即接受
            到同一客户的下一个请求,但这可以保证连接的完整性
            */
           if(errno == EAGAIN)
           {
                modfd(m_epollfd, m_sockfd, EPOLLOUT);   //缓冲区一旦有空闲空间,会触发EPOLLOUT事件,此时开始往m_sockfd写数据
                return true;
           }
           unmap();
           return false;
        }
        bytes_to_send -= temp;
        bytes_have_send += temp;
        if(bytes_to_send <= bytes_have_send)
        {
            //发送http响应成功,根据HTTP请求中的Connection字段决定是否立即关闭连接
            unmap();
            if(m_linger)
            {
                init();
                modfd(m_epollfd, m_sockfd, EPOLLIN);    //接收到普通数据会触发EPOLLIN
                return true;
            }
            else
            {
                modfd(m_epollfd, m_sockfd, EPOLLIN);
                return false;
            }
        }
    }
}

//往缓冲区中写入待发送的数据
bool http_conn::add_response(const char* format, ...)
{
    if(m_write_idx >= WRITE_BUFFER_SIZE)
    {
        return false;
    }
    va_list arg_list;
    va_start(arg_list, format);
    int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE-1-m_write_idx, format, arg_list);
    if(len >= (WRITE_BUFFER_SIZE-1-m_write_idx))
    {
        return false;
    }
    m_write_idx += len;
    va_end(arg_list);
    return true;
}

bool http_conn::add_status_line(int status, const char* title)
{
    return add_response("%s %d %s\r\n", "HTTP/1.1", status, title);
}

bool http_conn::add_headers(int content_len)
{
    add_content_length(content_len);
    add_linger();
    add_blank_line();
}

bool http_conn::add_content_length(int content_len)
{
    return add_response("Content-Length: %d\r\n", content_len);
}

bool http_conn::add_linger()
{
    return add_response("Connection: %s\r\n", (m_linger==true)?"keep-alive":"close");
}

bool http_conn::add_blank_line()
{
    return add_response("%s", "\r\n");
}

bool http_conn::add_content(const char* content)
{
    return add_response("%s", content);
}

//根据服务器处理HTTP请求得结果,决定返回给客户端的内容
bool http_conn::process_write(HTTP_CODE ret)
{
    switch (ret)
    {
        case INTERNAL_ERROR:
        {
            add_status_line(500, error_500_title);
            add_headers(strlen(error_500_form));
            if(!add_content(error_500_form))
            {
                return false;
            }
            break;
        }
        case BAD_REQUEST:
        {
            add_status_line(400, error_400_title);
            add_headers(strlen(error_400_form));
            if(!add_content(error_400_form))
            {
                return false;
            }
            break;
        }
        case NO_REQUEST:
        {
            add_status_line(400, error_400_title);
            add_headers(strlen(error_400_form));
            if (!add_content(error_400_form))
            {
                return false;
            }
            break;
        }
        case FORBIDDEN_REQUEST:
        {
            add_status_line(403, error_403_title);
            add_headers(strlen(error_403_form));
            if(!add_content(error_403_form))
            {
                return false;
            }
            break;
        }
        case FILE_REQUEST:
        {
            add_status_line(200, ok_200_title);
            if(m_file_stat.st_size != 0)
            {
                add_headers(m_file_stat.st_size);
                m_iv[0].iov_base = m_write_buf;
                m_iv[0].iov_len = m_write_idx;
                m_iv[1].iov_base = m_file_address;
                m_iv[1].iov_len = m_file_stat.st_size;
                m_iv_count = 2;
                return true;
            }
            else
            {
                const char* ok_string = "<html><body></body></html>";
                add_headers(strlen(ok_string));
                if(!add_content(ok_string))
                {
                    return false;
                }
            }
        }
        default:
        {
            return false;
        }
    }
    m_iv[0].iov_base = m_write_buf;
    m_iv[0].iov_len = m_write_idx;
    m_iv_count = 1;
    return true;
}

//由线程池中的工作线程调用,这是处理HTTP请求的入口函数
void http_conn::process()
{
    HTTP_CODE read_ret = process_read();
    if(read_ret == NO_REQUEST)  //请求不完整,需要继续读取客户数据
    {
        modfd(m_epollfd, m_sockfd, EPOLLIN);
        return;
    }

    bool write_ret = process_write(read_ret);   //返回给客户端的内容
    if (!write_ret)
    {
        close_conn();
    }
    modfd(m_epollfd, m_sockfd, EPOLLOUT);
}

        定义好任务类之后,main函数就变得很简单了,它只需要负责I/O读写,代码如main.cpp所示:

main.cpp

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <cassert>
#include <sys/epoll.h>

#include "locker.h"
#include "threadpool.h"
#include "http_conn.h"

#define MAX_FD 65536
#define MAX_EVENT_NUMBER 10000

extern int addfd(int epollfd, int fd, bool one_shot);
extern int removefd(int epollfd, int fd);

void addsig(int sig, void(handler)(int), bool restart = true)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = handler;
    if (restart)
    {
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig, &sa, NULL) != -1);
}

void show_error(int connfd, const char* info)
{
    printf("%s", info);
    send(connfd, info, strlen(info), 0);
    close(connfd);
}

int main(int argc, char* argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);
    //忽略SIGPIPE信号:往读端被关闭的管道或socket连接中写数据
    addsig(SIGPIPE, SIG_IGN);
    //创建线程池
    threadpool<http_conn>* pool = NULL;
    try
    {
        pool = new threadpool<http_conn>;
    }
    catch(...)
    {
        return 1;
    }
    //预先为每个可能的客户连接分配一个http_conn对象
    http_conn* users = new http_conn[MAX_FD];
    assert(users);
    int user_count = 0;

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);
    struct linger tmp = {1, 0};
    setsockopt(listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret >= 0);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd, false);
    http_conn::m_epollfd = epollfd;

    while(true)
    {
        int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if ((number < 0) && (errno != EINTR))
        {
            printf("epoll failure\n");
            break;
        }
        for (int i=0; i<number; i++)
        {
            int sockfd = events[i].data.fd;
            if(sockfd == listenfd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                if (connfd < 0)
                {
                    printf("errno is %d \n", errno);
                    continue;
                }
                if (http_conn::m_user_count >= MAX_FD)
                {
                    show_error(connfd, "Internal server busy");
                    continue;
                }
                //初始化客户连接
                users[connfd].init(connfd, client_address);
            }
            else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
            {
                //如果有异常,直接关闭客户连接
                users[sockfd].close_conn();
            }
            else if (events[i].events & EPOLLIN)
            {
                //根据读的结果,决定是将任务添加到线程池,还是关闭连接
                if (users[sockfd].read())
                {
                    pool->append(users + sockfd);
                }
                else
                {
                    users[sockfd].close_conn();
                }
            }
            else if (events[i].events & EPOLLOUT)
            {
                //根据写的结果,决定是否关闭连接
                if (!users[sockfd].write())
                {
                    users[sockfd].close_conn();
                }
            }
            else
            {
                //处理其他
            }
        }
    }
    close(epollfd);
    close(listenfd);
    delete [] users;
    delete pool;
    return 0;
}


版权声明:本文为qq_37070988原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。