muduo网络库设计与实现(一)

muduo网络库设计与实现(一)


这是根据陈硕的《linux多线程服务端编程》中的“ muduo网络库设计与实现”部分写的简单笔记。具体代码可以参考陈硕的github。虽然大体框架一样,但也有些许不同,文章最后也会把我的代码放出来。
这是第一部分,主要实现 Reactor模型的框架。

base

这部分是一些工具类的封装。最重要的是多线程相关的函数接口的封装,主要包括

  1. Thread:线程的创建和等待结束
  2. MutexLock:mutex的创建,销毁,加锁,解锁
  3. Condition:条件变量的创建,销毁,等待,通知,广播

除此之外还有更高层的封装,例如ThreadPool,CountDownLatch等。

noncopyable

有两种思路

  • 一种使用delete关键字修饰拷贝构造函数和拷贝赋值函数
  • 第二种将拷贝构造函数和拷贝赋值函数设置为private
class noncopyable {
public:
	noncopyable(const noncopyable&) = delete;
	const noncopyable& operator=( const noncopyable& ) = delete;
protected:
	noncopyable() = default;
	~noncopyable() = default;
};

Mutex

包括类MutexLock和类MutexLockGuard,这两个类都比较简单,所以就不过多介绍

class MutexLock : noncopyable{
public:
    MutexLock() : holder_(0){
        pthread_mutex_init(&mutex_, NULL);
    }

    ~MutexLock(){
        pthread_mutex_lock(&mutex_);
        pthread_mutex_destroy(&mutex_);
    }

    bool isLockByThisThread(){
        return holder_ == CurrentThread::tid();
    }

    void lock(){ //仅供 MutexLockGuard 调用
        pthread_mutex_lock(&mutex_);
        holder_ = CurrentThread::tid();
    }

    void unlock(){ //仅供 MutexLockGuard 调用
        holder_ = 0;
        pthread_mutex_unlock(&mutex_);
    }

    pthread_mutex_t* getPthreadMutex(){ //仅供 Condition 调用
        return &mutex_;
    }
private:
    pthread_mutex_t mutex_;
    pid_t holder_;
};
class MutexLockGuard : noncopyable{
public:
    explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex){
        mutex_.lock();
    }
    
    ~MutexLockGuard(){
        mutex_.unlock();
    }
private:
    MutexLock& mutex_;
};

Condition

class Condition : noncopyable{
public:
    explicit Condition(MutexLock& mutex) : mutex_(mutex){
        pthread_cond_init(&pcond_, NULL);
    }

    ~Condition(){
        pthread_cond_destroy(&pcond_);
    }

    void wait(){
        pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
    }

    void notify(){
        pthread_cond_signal(&pcond_);
    }

    void notifyAll(){
        pthread_cond_broadcast(&pcond_);
    }

    // returns true if time out, false otherwise.
    bool waitForSeconds(int seconds){
        struct timespec abstime;
        clock_gettime(CLOCK_REALTIME, &abstime);
        abstime.tv_sec += static_cast<time_t>(seconds);
        return ETIMEDOUT == pthread_cond_timedwait(
            &pcond_, mutex_.getPthreadMutex(), &abstime);
    }
    
private:
    MutexLock& mutex_;
    pthread_cond_t pcond_;
};

CountDownLatch

CountDownLatch是一种常用且易用的同步手段,主要有两种用途:

  • 主线程发起多个子线程,等这些子线程各自完成一定任务后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
  • 主线程发起多个子线程,子线程都等待主线程,主线程完成其他一些任务后,通知所有子线程开始执行。通常用于多个子线程等待主线程发出“起跑”命令
class CountDownLatch : noncopyable{
public:
    //倒数几次
    explicit CountDownLatch(int count) : mutex_(), condition_(mutex_), count_(count) {}
    //等待计数值变为0
    void wait(){
        MutexLockGuard lock(mutex_);
        while (count_ > 0) condition_.wait();
    }
	//计数减1
    void countDown(){
        MutexLockGuard lock(mutex_);
        --count_;
        if (count_ == 0) condition_.notifyAll();
    }

    int getCount() const {
        MutexLockGuard lock(mutex_);
        return count_;
    }

private:
    mutable MutexLock mutex_;
    Condition condition_;
    int count_;
};

CurrentThread

这部分的功能是获取线程标识。因为各种原因pthread_t不适合作为线程的标识符,muduo中使用下面的函数调用的返回值作为线程的id。

pid_t gettid() { return static_cast<pid_t>(::syscall(SYS_gettid)); }

同时为了避免多次进行系统调用,使用__thread变量来缓存gettid()的返回值,这样只有在本线程第一次调用的时候才会进行系统调用,以后都直接从thread local缓存的线程id拿到结果。所以在获取本线程id时,应该使用下面的函数

inline int tid() {
  if (__builtin_expect(t_cachedTid == 0, 0)) {
    cacheTid();
  }
  return t_cachedTid;
}

__thread是GCC内置的线程局部存储设施,但需要注意它的使用规则:

  1. 只能用于修饰POD类型
  2. 只能修饰全局变量,函数内的静态变量;不能修饰函数的局部变量或class的普通成员变量
  3. 用__thread修饰的变量只能用编译期常量初始化。

Thread

这部分是对线程的封装,具体实现看代码,比较简单。像前文所说,最重要的是线程的创建和等待。需要关注的是Thread类的构造函数,析构函数,start(),join()。

//线程需要运行的函数,和线程的名称
explicit Thread(ThreadFunc, const std::string& name = std::string());

//如果线程已经开始,但还没join,那么将线程分离
Thread::~Thread() {
    if(started_ && !joined_) pthread_detach(pthreadId_);
}

void Thread::start() {
    assert(!started_);
    started_ = true;
    ThreadData* data = new ThreadData(func_, name_, &tid_, &latch_);
    if(pthread_create(&pthreadId_, NULL, &startThread, data)){
        started_ = false;
        delete data;
    }
    else{
        latch_.wait();
        assert(tid_ > 0);
    }
}

int Thread::join() {
    assert(started_);
    assert(!joined_);
    joined_ = true;
    return pthread_join(pthreadId_, NULL);
}

Reactor模型

muduo多线程模型

在进入具体代码之前,我们先简单介绍一下muduo的多线程模型。muduo使用one loop per thread和线程池的模式,基本上是个reactor的模型。

网络编程中有许多事务性的工作,可以提取为公用的框架,而用户只需要填上业务逻辑代码,并将回调注册到框架中,就可以实现完整的网络服务,这正是Reactor模式的主要思想。Reactor的意义在于将IO事件分发到用户提供的处理函数,并保持网络部分通用代码不变,独立于用户的业务逻辑。

一个单线程的Reactor逻辑是方程简单的,在没有事件的时候,线程等待在epoll_wait等函数上,事件到达后由网络库处理IO,再把消息通知(回调)客户端代码。

muduo使用的是one loop per thread模式,有一个main Reactor负责accept连接,然后把连接挂在某个sub Reactor中。这是muduo默认的多线程模型。

如果有需要,也可以用线程池来处理计算,也就是既使用多个Reactor来处理IO,又使用线程池来处理计算,这种方式适合既有突发IO,又有突发计算的应用。

总结muduo网络库,支持四种实用的方案:

  1. 单线程Reactor
  2. Reactor + 线程池
  3. one loop per thread
  4. one loop per thread + 线程池

在本节,我们会实现Reactor事件处理框架,在这的基础上,可以非常简单的扩展到one loop per thread模型。本节要实现的有一下几部分:

  • Eventloop:事件循环(反应器Reactor),每个线程只能有一个Eventloop实体,它负责IO和定时器事件的分派。它用TimerQueue作为计时器管理,用Epoller作为IO复用。
  • Epoller:封装了epoll的IO复用,它是Eventloop的成员,生命期由后者控制。
  • TimerQueue:用timerfd实现定时,用std::map来管理Timer。它是Eventloop的成员,生命期由后者控制
  • Channel:负责注册与响应IO事件,注意它不拥有文件描述符。
  • EventLoopThread:启动一个线程,在其中运行EventLoop::loop()

EventLoop

Eventloop的成员变量:

typedef std::vector<Channel*> ChannelList;
bool looping_; //是否正在进行循环
bool quit_; // 是否希望退出循环
bool callingPendingFunctors_; // 是否正在处理任务队列
const pid_t threadId_; // 该线程的id
int64_t pollReturnTime_; // 本轮epoll_wait返回的时间戳
std::shared_ptr<Epoller> poller_; // 该EventLoop的epoller
std::shared_ptr<TimerQueue> timerQueue_; //该EventLoop的定时器
ChannelList activeChannels_; // 用于epoller返回有事件的channel
int wakeupFd_; // 用于唤醒的fd
std::shared_ptr<Channel> wakeupChannel; // 用于唤醒的Channel
MutexLock mutex_; // 与任务队列相关的锁
std::vector<Functor> pendingFunctors_; // 任务队列 @GuardedBy mutex_

EventLoop的构造函数会记住本对象所属的线程threadId_,创建了EventLoop对象的线程是IO线程,其主要功能是运行事件循环EventLoop::loop()

void EventLoop::loop(){
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false;
    while(!quit_){
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        for(auto it = activeChannels_.begin(); it != activeChannels_.end(); it++){
            (*it)->handleEvent();
        }
        doPendingFunctors();
    }
    looping_ = false;
}

EventLoop有几个定时器接口EventLoop::runAt()EventLoop::runAfter(),EventLoop::runEvery()这几个函数都转而调用TimerQueue::addTimer,注意几个函数允许跨线程使用,这就带来类线程安全问题。muduo解决办法不是加锁,而是把对TimerQueue的操作转移到IO线程来进行,这会用到EventLoop::runInLoop()

TimerId EventLoop::runAt(const int64_t& time, const TimerCallback& cb){
    return timerQueue_->addTimer(cb, time, 0.0);
}
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb){
    int64_t time = addTime(get_now(), delay);
    return runAt(time, cb);
}
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb){
    int64_t time = addTime(get_now(), interval);
    return timerQueue_->addTimer(cb, time, interval);
}

EventLoop::runInLoop()是非常有用的功能,它可以在它的IO线程中执行某用户的任务回调。如果用户在当前IO线程调用这个函数,回调会同步进行;如果用户在其他线程调用这个函数,cb会被加入队列,IO线程会被唤醒来调用这个任务。
有了这个功能,我们就可以轻易地在线程间调配任务。

void EventLoop::runInLoop(const Functor& cb){
    if(isInLoopThread()) cb();
    else queueInLoop(cb); // 将cb放入任务队列中
}

在事件循环中,处理完IO事件后,会调用EventLoop::doPendingFunctors()处理任务队列。需要注意的是这个函数不是简单地在临界区内依次处理任务,而是把回调列表(任务列表) swap() 到局部变量 functors 中,这样一方面减小了临界区的长度,另一方面页避免类死锁(因为回调函数中可能调用queueInLoop() )。

void EventLoop::doPendingFunctors(){
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    for(int i = 0; i < functors.size(); i++){
        functors[i]();
    }
    callingPendingFunctors_ = false;
}

Channel

每个Channel对象只属于一个EventLoop,因此每个Channel对象都只属于一个IO线程。每个Channel对象只负责一个fd的IO事件分发,但它并不拥有这个fd,也不会在析构时关闭这个fd。Channel会把不同的IO事件分发为不同的回调,例如ReadCallback,WriteCallback等。用户一般不直接使用Channel,而回使用更上层的封装,入TCPConnection。

Channel的主要成员变量:

typedef std::function<void()> EventCallback;
EventLoop* loop_; // 所属的EventLoop
const int fd_;  // 负责的文件描述符
int events_;    // 关心的IO事件
int revents_;   // 目前活动的事件,由EventLoop/Epoller设置
int index_;     // 供 Epoll 使用

bool eventHandling_; // 是否在进行事件分发

// 不同事件的回调函数,由其拥有者注册
// 例如TcpConnection在构造的时候会将其 handleRead 函数注册为其channel 的 readCallback_
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback errorCallback_;
EventCallback closeCallback_;

接口:

// 构造函数,需要提供其owner EventLoop和其负责的文件描述符
Channel(EventLoop* loop, int fd);
~Channel();

// 设置回调函数和处理事件
void handleEvent(int64_t receiveTime);
void setReadCallback(const EventCallback& cb) {readCallback_ = cb;}
void setWriteCallback(const EventCallback& cb) {writeCallback_ = cb;}
void setErrorCallback(const EventCallback& cb) {errorCallback_ = cb;}
void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; }
// 获取或设置信息
int fd() const {return fd_;}
int events() const {return events_;}
void set_revents(int revt) {revents_ = revt;}
bool isNoneEvent() const {return events_ == kNoneEvent;}
int index() {return index_;}
void set_index(int idx) {index_ = idx;}
EventLoop* ownerLoop() {return loop_;}
// 操作对应的fd
void enableReading() {events_ |= kReadEvent; update();}
void enableWriting() {events_ |= kWriteEvent; update();}
void disableWriting() {events_ &= ~kWriteEvent; update();}
void disableAll() {events_ = kNoneEvent; update();}
bool isWriting() const { return events_ & kWriteEvent; }

其中需要注意的是,Channel::update()会调用EventLoop::updateChannel(),后者会转而调用Epoller::updateChannel()

Channel::handleEvent()是Channel的核心,由EventLoop::loop()调用,它的功能是根据revents_的值分别调用不同的用户回调。这是它的简略版本

void Channel::handleEvent(int64_t receiveTime){
    eventHandling_ = true;
    if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)){
        if(closeCallback_) closeCallback_();
    }
    if(revents_ & EPOLLERR){
        if(errorCallback_) errorCallback_();
    }
    if(revents_ & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)){
        if(readCallback_) readCallback_(receiveTime);
    }
    if(revents_ & EPOLLOUT){
        if(writeCallback_) writeCallback_();
    }
    eventHandling_ = false;
}

Epoller

Epoller是IO复用的封装,Epoller是EventLoop的成员,只供其owner EventLoop在IO线程中使用,因此不需要加锁。Epoller并不拥有Channel,Channel在析构之前必须unregister(使用EventLoop::removeChannel)。

Epoller成员变量:

typedef std::vector<struct epoll_event> EventList;
typedef std::map<int, Channel*> ChannelMap;

EventLoop* ownerLoop_; //其owner EventLoop
int epollfd_;
EventList events_; //保存epoll_wait返回的事件
ChannelMap channels_; //从fd到Channel*的映射

Epoller::poll是Epoller的核心,被EventLoop::loop()调用,它通过epoll_wait获取当前活动的IO事件,然后填充传入的activeChannels,并返回epoll_wait() return的时刻。

int64_t Epoller::poll(int timeoutMs, ChannelList* activeChannels){
    int numEvents = epoll_wait(epollfd_, &*events_.begin(),
                               events_.size(), timeoutMs);
    int64_t now = get_now();

    if(numEvents < 0) perror("epoll wait error");
    else if(numEvents > 0)fillActiveChannels(numEvents, activeChannels);

    return now;
}

Epoller::updateChannel()主要功能是维护和更新epollfd_监听的fd。

void Epoller::updateChannel(Channel* channel){
    assertInLoopThread();
    const int index = channel->index();
    if(index == kNew || index == kDeleted){
        int fd = channel->fd();
        if(index == kNew){
            assert(channels_.find(fd) == channels_.end());
            channels_[fd] = channel;
        }
        else{
            assert(channels_.find(fd) != channels_.end());
            assert(channels_[fd] == channel);
        }
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else{
        if(channel->isNoneEvent()){
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(kDeleted);
        }
        else{
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

void Epoller::update(int operation, Channel* channel){
    struct epoll_event event;
    bzero(&event, sizeof event);
    event.events = channel->events();
    event.data.ptr = channel;
    int fd = channel->fd();
    if(epoll_ctl(epollfd_, operation, fd, &event) < 0){
        perror("epoll_mod error");
    }
}

TimerQueue

muduo的定时器功能由三个class实现,TimerId,Timer,TimerQueue。muduo中有自己的时间戳class,这里为了方便,使用从1970年00:00:00到当前时刻的微妙数作为事件戳。

Timer的定义:

typedef std::function<void()> TimerCallback;

class Timer : noncopyable{
public:
    Timer(const TimerCallback& cb, int64_t when, double interval)
    : callback_(cb), expiration_(when), interval_(interval), repeat_(interval > 0.0) {}

    void run() const{
        callback_();
    }

    int64_t expiration() const {return expiration_;}
    bool repeat() const {return repeat_;}

    void restart(int64_t now){
        if(repeat_){
            expiration_ = now + interval_ * 1000000;
        }
        else{
            expiration_ = 0;
        }
    }

private:
    const TimerCallback callback_;  // 定时器回调函数
    int64_t expiration_;            // 下一次超时时间
    const double interval_;         // 超时时间间隔,如果是一次性定时器,该值为0
    const bool repeat_;             // 是否重复
};

TimerQueue的成员变量:

typedef std::pair<int64_t, Timer *> Entry;
typedef std::set <Entry> TimerList;

EventLoop* loop_; // owner EventLoop
const int timerfd_; // 使用linux中的timerfd,可以用和处理IO事件相同的方式处理定时
Channel timerfdChannel_; // 负责timerfd_的Channel
TimerList timers_; // 保存定时器的数据结构,这里使用set

TimerQueue::getExpired()会从timers_中移除已经到时的Timer,并通过vector返回它们。TimerQueue::getExpired() 会被 timerfdChannel_中注册的readCallback_函数调用

std::vector<TimerQueue::Entry> TimerQueue::getExpired(int64_t now){
    std::vector<Entry> expired;
    Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
    auto it = timers_.lower_bound(sentry);
    assert(it == timers_.end() || now < it->first);
    std::copy(timers_.begin(), it, back_inserter(expired));
    timers_.erase(timers_.begin(), it);

    return expired;
}

timerfdChannel_中注册的readCallback_函数如下:

void TimerQueue::handleRead(){
    loop_->assertInLoopThread();
    int64_t now = get_now();
    readTimerfd(timerfd_, now);

    std::vector<Entry> expired = getExpired(now);

    for(auto it = expired.begin(); it != expired.end(); it++){
        it->second->run();
    }
    reset(expired, now);
}

TimerQueue::addTimer用于向TimerQueue添加新的timer。注意我们不能直接在addTimer(),中调用insert(),因为addTimer()可能被其他线程调用,而我们必须保证insert()是在本线程中被调用。

TimerId TimerQueue::addTimer(const TimerCallback& cb, int64_t when, double interval){
    Timer* timer = new Timer(cb, when, interval);
    loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
    return TimerId(timer);
}

void TimerQueue::addTimerInLoop(Timer* timer){
    loop_->assertInLoopThread();
    bool earliestChanged = insert(timer);
    if(earliestChanged){
        resetTimerfd(timerfd_, timer->expiration());
    }
}

EventLoopThread

IO线程不一定是主线程,我们可以在任何一个线程创建并运行EventLoop。一个线程也可以有不止一个IO线程,muduo中的main Reactor 和 sub Reactor 都是IO线程。因此我们需要EventLoopThread类。

class EventLoopThread : noncopyable{
public:
    EventLoopThread();
    ~EventLoopThread();
    EventLoop* startLoop();
private:
    void threadFunc();
    EventLoop* loop_;
    bool exiting_;
    Thread thread_;
    MutexLock mutex_;
    Condition cond_;
};

其中关键的EventLoopThread::startLoop()定义如下,这个函数会返回新线程中的EventLoop指针

EventLoop* EventLoopThread::startLoop(){
    assert(!thread_.started());
    thread_.start();
    {
        MutexLockGuard lock(mutex_);
        // 一直等到threadFun在Thread里真正跑起来
        while(loop_ == NULL){
            cond_.wait();
        }
    }
    return loop_;
}

测试

一个简单的测试程序,建立类几个定时器任务。

#include "EventLoop.h"
#include "Channel.h"
#include "base/CurrentThread.h"
#include "base/Thread.h"
#include <stdio.h>
#include <sys/timerfd.h>
#include <string.h>
#include <functional>

int cnt = 0;
EventLoop* g_loop;

void printTid()
{
  printf("pid = %d, tid = %d\n", getpid(), CurrentThread::tid());
}

void print(const char* msg)
{
  printf("msg %s\n", msg);
  if (++cnt == 20)
  {
    g_loop->quit();
  }
}

int main()
{
  printTid();
  EventLoop loop;
  g_loop = &loop;

  print("main");
  loop.runAfter(1, std::bind(print, "once1"));
  loop.runAfter(1.5, std::bind(print, "once1.5"));
  loop.runAfter(2.5, std::bind(print, "once2.5"));
  loop.runAfter(3.5, std::bind(print, "once3.5"));
  loop.runEvery(2, std::bind(print, "every2"));
  loop.runEvery(3, std::bind(print, "every3"));

  loop.loop();
  print("main loop exits");
  sleep(1);
}

代码地址

https://github.com/ZhaoxuWang/simple-muduo/tree/main/stage1


版权声明:本文为weixin_38880337原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。