muduo网络库设计与实现(一)
文章目录
这是根据陈硕的《linux多线程服务端编程》中的“ muduo网络库设计与实现”部分写的简单笔记。具体代码可以参考陈硕的github。虽然大体框架一样,但也有些许不同,文章最后也会把我的代码放出来。
这是第一部分,主要实现
Reactor模型的框架。base
这部分是一些工具类的封装。最重要的是多线程相关的函数接口的封装,主要包括
- Thread:线程的创建和等待结束
- MutexLock:mutex的创建,销毁,加锁,解锁
- Condition:条件变量的创建,销毁,等待,通知,广播
除此之外还有更高层的封装,例如ThreadPool,CountDownLatch等。
noncopyable
有两种思路
- 一种使用
delete关键字修饰拷贝构造函数和拷贝赋值函数 - 第二种将拷贝构造函数和拷贝赋值函数设置为private
class noncopyable {
public:
noncopyable(const noncopyable&) = delete;
const noncopyable& operator=( const noncopyable& ) = delete;
protected:
noncopyable() = default;
~noncopyable() = default;
};
Mutex
包括类MutexLock和类MutexLockGuard,这两个类都比较简单,所以就不过多介绍
class MutexLock : noncopyable{
public:
MutexLock() : holder_(0){
pthread_mutex_init(&mutex_, NULL);
}
~MutexLock(){
pthread_mutex_lock(&mutex_);
pthread_mutex_destroy(&mutex_);
}
bool isLockByThisThread(){
return holder_ == CurrentThread::tid();
}
void lock(){ //仅供 MutexLockGuard 调用
pthread_mutex_lock(&mutex_);
holder_ = CurrentThread::tid();
}
void unlock(){ //仅供 MutexLockGuard 调用
holder_ = 0;
pthread_mutex_unlock(&mutex_);
}
pthread_mutex_t* getPthreadMutex(){ //仅供 Condition 调用
return &mutex_;
}
private:
pthread_mutex_t mutex_;
pid_t holder_;
};
class MutexLockGuard : noncopyable{
public:
explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex){
mutex_.lock();
}
~MutexLockGuard(){
mutex_.unlock();
}
private:
MutexLock& mutex_;
};
Condition
class Condition : noncopyable{
public:
explicit Condition(MutexLock& mutex) : mutex_(mutex){
pthread_cond_init(&pcond_, NULL);
}
~Condition(){
pthread_cond_destroy(&pcond_);
}
void wait(){
pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
}
void notify(){
pthread_cond_signal(&pcond_);
}
void notifyAll(){
pthread_cond_broadcast(&pcond_);
}
// returns true if time out, false otherwise.
bool waitForSeconds(int seconds){
struct timespec abstime;
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += static_cast<time_t>(seconds);
return ETIMEDOUT == pthread_cond_timedwait(
&pcond_, mutex_.getPthreadMutex(), &abstime);
}
private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};
CountDownLatch
CountDownLatch是一种常用且易用的同步手段,主要有两种用途:
- 主线程发起多个子线程,等这些子线程各自完成一定任务后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
- 主线程发起多个子线程,子线程都等待主线程,主线程完成其他一些任务后,通知所有子线程开始执行。通常用于多个子线程等待主线程发出“起跑”命令
class CountDownLatch : noncopyable{
public:
//倒数几次
explicit CountDownLatch(int count) : mutex_(), condition_(mutex_), count_(count) {}
//等待计数值变为0
void wait(){
MutexLockGuard lock(mutex_);
while (count_ > 0) condition_.wait();
}
//计数减1
void countDown(){
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0) condition_.notifyAll();
}
int getCount() const {
MutexLockGuard lock(mutex_);
return count_;
}
private:
mutable MutexLock mutex_;
Condition condition_;
int count_;
};
CurrentThread
这部分的功能是获取线程标识。因为各种原因pthread_t不适合作为线程的标识符,muduo中使用下面的函数调用的返回值作为线程的id。
pid_t gettid() { return static_cast<pid_t>(::syscall(SYS_gettid)); }
同时为了避免多次进行系统调用,使用__thread变量来缓存gettid()的返回值,这样只有在本线程第一次调用的时候才会进行系统调用,以后都直接从thread local缓存的线程id拿到结果。所以在获取本线程id时,应该使用下面的函数
inline int tid() {
if (__builtin_expect(t_cachedTid == 0, 0)) {
cacheTid();
}
return t_cachedTid;
}
__thread是GCC内置的线程局部存储设施,但需要注意它的使用规则:
- 只能用于修饰POD类型
- 只能修饰全局变量,函数内的静态变量;不能修饰函数的局部变量或class的普通成员变量
- 用__thread修饰的变量只能用编译期常量初始化。
Thread
这部分是对线程的封装,具体实现看代码,比较简单。像前文所说,最重要的是线程的创建和等待。需要关注的是Thread类的构造函数,析构函数,start(),join()。
//线程需要运行的函数,和线程的名称
explicit Thread(ThreadFunc, const std::string& name = std::string());
//如果线程已经开始,但还没join,那么将线程分离
Thread::~Thread() {
if(started_ && !joined_) pthread_detach(pthreadId_);
}
void Thread::start() {
assert(!started_);
started_ = true;
ThreadData* data = new ThreadData(func_, name_, &tid_, &latch_);
if(pthread_create(&pthreadId_, NULL, &startThread, data)){
started_ = false;
delete data;
}
else{
latch_.wait();
assert(tid_ > 0);
}
}
int Thread::join() {
assert(started_);
assert(!joined_);
joined_ = true;
return pthread_join(pthreadId_, NULL);
}
Reactor模型
muduo多线程模型
在进入具体代码之前,我们先简单介绍一下muduo的多线程模型。muduo使用one loop per thread和线程池的模式,基本上是个reactor的模型。
网络编程中有许多事务性的工作,可以提取为公用的框架,而用户只需要填上业务逻辑代码,并将回调注册到框架中,就可以实现完整的网络服务,这正是Reactor模式的主要思想。Reactor的意义在于将IO事件分发到用户提供的处理函数,并保持网络部分通用代码不变,独立于用户的业务逻辑。
一个单线程的Reactor逻辑是方程简单的,在没有事件的时候,线程等待在epoll_wait等函数上,事件到达后由网络库处理IO,再把消息通知(回调)客户端代码。
muduo使用的是one loop per thread模式,有一个main Reactor负责accept连接,然后把连接挂在某个sub Reactor中。这是muduo默认的多线程模型。
如果有需要,也可以用线程池来处理计算,也就是既使用多个Reactor来处理IO,又使用线程池来处理计算,这种方式适合既有突发IO,又有突发计算的应用。
总结muduo网络库,支持四种实用的方案:
- 单线程Reactor
- Reactor + 线程池
- one loop per thread
- one loop per thread + 线程池
在本节,我们会实现Reactor事件处理框架,在这的基础上,可以非常简单的扩展到one loop per thread模型。本节要实现的有一下几部分:
Eventloop:事件循环(反应器Reactor),每个线程只能有一个Eventloop实体,它负责IO和定时器事件的分派。它用TimerQueue作为计时器管理,用Epoller作为IO复用。Epoller:封装了epoll的IO复用,它是Eventloop的成员,生命期由后者控制。TimerQueue:用timerfd实现定时,用std::map来管理Timer。它是Eventloop的成员,生命期由后者控制Channel:负责注册与响应IO事件,注意它不拥有文件描述符。EventLoopThread:启动一个线程,在其中运行EventLoop::loop()
EventLoop
Eventloop的成员变量:
typedef std::vector<Channel*> ChannelList;
bool looping_; //是否正在进行循环
bool quit_; // 是否希望退出循环
bool callingPendingFunctors_; // 是否正在处理任务队列
const pid_t threadId_; // 该线程的id
int64_t pollReturnTime_; // 本轮epoll_wait返回的时间戳
std::shared_ptr<Epoller> poller_; // 该EventLoop的epoller
std::shared_ptr<TimerQueue> timerQueue_; //该EventLoop的定时器
ChannelList activeChannels_; // 用于epoller返回有事件的channel
int wakeupFd_; // 用于唤醒的fd
std::shared_ptr<Channel> wakeupChannel; // 用于唤醒的Channel
MutexLock mutex_; // 与任务队列相关的锁
std::vector<Functor> pendingFunctors_; // 任务队列 @GuardedBy mutex_
EventLoop的构造函数会记住本对象所属的线程threadId_,创建了EventLoop对象的线程是IO线程,其主要功能是运行事件循环EventLoop::loop()。
void EventLoop::loop(){
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while(!quit_){
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for(auto it = activeChannels_.begin(); it != activeChannels_.end(); it++){
(*it)->handleEvent();
}
doPendingFunctors();
}
looping_ = false;
}
EventLoop有几个定时器接口EventLoop::runAt(),EventLoop::runAfter(),EventLoop::runEvery()这几个函数都转而调用TimerQueue::addTimer,注意几个函数允许跨线程使用,这就带来类线程安全问题。muduo解决办法不是加锁,而是把对TimerQueue的操作转移到IO线程来进行,这会用到EventLoop::runInLoop()
TimerId EventLoop::runAt(const int64_t& time, const TimerCallback& cb){
return timerQueue_->addTimer(cb, time, 0.0);
}
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb){
int64_t time = addTime(get_now(), delay);
return runAt(time, cb);
}
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb){
int64_t time = addTime(get_now(), interval);
return timerQueue_->addTimer(cb, time, interval);
}
EventLoop::runInLoop()是非常有用的功能,它可以在它的IO线程中执行某用户的任务回调。如果用户在当前IO线程调用这个函数,回调会同步进行;如果用户在其他线程调用这个函数,cb会被加入队列,IO线程会被唤醒来调用这个任务。
有了这个功能,我们就可以轻易地在线程间调配任务。
void EventLoop::runInLoop(const Functor& cb){
if(isInLoopThread()) cb();
else queueInLoop(cb); // 将cb放入任务队列中
}
在事件循环中,处理完IO事件后,会调用EventLoop::doPendingFunctors()处理任务队列。需要注意的是这个函数不是简单地在临界区内依次处理任务,而是把回调列表(任务列表) swap() 到局部变量 functors 中,这样一方面减小了临界区的长度,另一方面页避免类死锁(因为回调函数中可能调用queueInLoop() )。
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for(int i = 0; i < functors.size(); i++){
functors[i]();
}
callingPendingFunctors_ = false;
}
Channel
每个Channel对象只属于一个EventLoop,因此每个Channel对象都只属于一个IO线程。每个Channel对象只负责一个fd的IO事件分发,但它并不拥有这个fd,也不会在析构时关闭这个fd。Channel会把不同的IO事件分发为不同的回调,例如ReadCallback,WriteCallback等。用户一般不直接使用Channel,而回使用更上层的封装,入TCPConnection。
Channel的主要成员变量:
typedef std::function<void()> EventCallback;
EventLoop* loop_; // 所属的EventLoop
const int fd_; // 负责的文件描述符
int events_; // 关心的IO事件
int revents_; // 目前活动的事件,由EventLoop/Epoller设置
int index_; // 供 Epoll 使用
bool eventHandling_; // 是否在进行事件分发
// 不同事件的回调函数,由其拥有者注册
// 例如TcpConnection在构造的时候会将其 handleRead 函数注册为其channel 的 readCallback_
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback errorCallback_;
EventCallback closeCallback_;
接口:
// 构造函数,需要提供其owner EventLoop和其负责的文件描述符
Channel(EventLoop* loop, int fd);
~Channel();
// 设置回调函数和处理事件
void handleEvent(int64_t receiveTime);
void setReadCallback(const EventCallback& cb) {readCallback_ = cb;}
void setWriteCallback(const EventCallback& cb) {writeCallback_ = cb;}
void setErrorCallback(const EventCallback& cb) {errorCallback_ = cb;}
void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; }
// 获取或设置信息
int fd() const {return fd_;}
int events() const {return events_;}
void set_revents(int revt) {revents_ = revt;}
bool isNoneEvent() const {return events_ == kNoneEvent;}
int index() {return index_;}
void set_index(int idx) {index_ = idx;}
EventLoop* ownerLoop() {return loop_;}
// 操作对应的fd
void enableReading() {events_ |= kReadEvent; update();}
void enableWriting() {events_ |= kWriteEvent; update();}
void disableWriting() {events_ &= ~kWriteEvent; update();}
void disableAll() {events_ = kNoneEvent; update();}
bool isWriting() const { return events_ & kWriteEvent; }
其中需要注意的是,Channel::update()会调用EventLoop::updateChannel(),后者会转而调用Epoller::updateChannel()。
Channel::handleEvent()是Channel的核心,由EventLoop::loop()调用,它的功能是根据revents_的值分别调用不同的用户回调。这是它的简略版本
void Channel::handleEvent(int64_t receiveTime){
eventHandling_ = true;
if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)){
if(closeCallback_) closeCallback_();
}
if(revents_ & EPOLLERR){
if(errorCallback_) errorCallback_();
}
if(revents_ & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)){
if(readCallback_) readCallback_(receiveTime);
}
if(revents_ & EPOLLOUT){
if(writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
Epoller
Epoller是IO复用的封装,Epoller是EventLoop的成员,只供其owner EventLoop在IO线程中使用,因此不需要加锁。Epoller并不拥有Channel,Channel在析构之前必须unregister(使用EventLoop::removeChannel)。
Epoller成员变量:
typedef std::vector<struct epoll_event> EventList;
typedef std::map<int, Channel*> ChannelMap;
EventLoop* ownerLoop_; //其owner EventLoop
int epollfd_;
EventList events_; //保存epoll_wait返回的事件
ChannelMap channels_; //从fd到Channel*的映射
Epoller::poll是Epoller的核心,被EventLoop::loop()调用,它通过epoll_wait获取当前活动的IO事件,然后填充传入的activeChannels,并返回epoll_wait() return的时刻。
int64_t Epoller::poll(int timeoutMs, ChannelList* activeChannels){
int numEvents = epoll_wait(epollfd_, &*events_.begin(),
events_.size(), timeoutMs);
int64_t now = get_now();
if(numEvents < 0) perror("epoll wait error");
else if(numEvents > 0)fillActiveChannels(numEvents, activeChannels);
return now;
}
Epoller::updateChannel()主要功能是维护和更新epollfd_监听的fd。
void Epoller::updateChannel(Channel* channel){
assertInLoopThread();
const int index = channel->index();
if(index == kNew || index == kDeleted){
int fd = channel->fd();
if(index == kNew){
assert(channels_.find(fd) == channels_.end());
channels_[fd] = channel;
}
else{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else{
if(channel->isNoneEvent()){
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else{
update(EPOLL_CTL_MOD, channel);
}
}
}
void Epoller::update(int operation, Channel* channel){
struct epoll_event event;
bzero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
if(epoll_ctl(epollfd_, operation, fd, &event) < 0){
perror("epoll_mod error");
}
}
TimerQueue
muduo的定时器功能由三个class实现,TimerId,Timer,TimerQueue。muduo中有自己的时间戳class,这里为了方便,使用从1970年00:00:00到当前时刻的微妙数作为事件戳。
Timer的定义:
typedef std::function<void()> TimerCallback;
class Timer : noncopyable{
public:
Timer(const TimerCallback& cb, int64_t when, double interval)
: callback_(cb), expiration_(when), interval_(interval), repeat_(interval > 0.0) {}
void run() const{
callback_();
}
int64_t expiration() const {return expiration_;}
bool repeat() const {return repeat_;}
void restart(int64_t now){
if(repeat_){
expiration_ = now + interval_ * 1000000;
}
else{
expiration_ = 0;
}
}
private:
const TimerCallback callback_; // 定时器回调函数
int64_t expiration_; // 下一次超时时间
const double interval_; // 超时时间间隔,如果是一次性定时器,该值为0
const bool repeat_; // 是否重复
};
TimerQueue的成员变量:
typedef std::pair<int64_t, Timer *> Entry;
typedef std::set <Entry> TimerList;
EventLoop* loop_; // owner EventLoop
const int timerfd_; // 使用linux中的timerfd,可以用和处理IO事件相同的方式处理定时
Channel timerfdChannel_; // 负责timerfd_的Channel
TimerList timers_; // 保存定时器的数据结构,这里使用set
TimerQueue::getExpired()会从timers_中移除已经到时的Timer,并通过vector返回它们。TimerQueue::getExpired() 会被 timerfdChannel_中注册的readCallback_函数调用
std::vector<TimerQueue::Entry> TimerQueue::getExpired(int64_t now){
std::vector<Entry> expired;
Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
auto it = timers_.lower_bound(sentry);
assert(it == timers_.end() || now < it->first);
std::copy(timers_.begin(), it, back_inserter(expired));
timers_.erase(timers_.begin(), it);
return expired;
}
timerfdChannel_中注册的readCallback_函数如下:
void TimerQueue::handleRead(){
loop_->assertInLoopThread();
int64_t now = get_now();
readTimerfd(timerfd_, now);
std::vector<Entry> expired = getExpired(now);
for(auto it = expired.begin(); it != expired.end(); it++){
it->second->run();
}
reset(expired, now);
}
TimerQueue::addTimer用于向TimerQueue添加新的timer。注意我们不能直接在addTimer(),中调用insert(),因为addTimer()可能被其他线程调用,而我们必须保证insert()是在本线程中被调用。
TimerId TimerQueue::addTimer(const TimerCallback& cb, int64_t when, double interval){
Timer* timer = new Timer(cb, when, interval);
loop_->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer);
}
void TimerQueue::addTimerInLoop(Timer* timer){
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);
if(earliestChanged){
resetTimerfd(timerfd_, timer->expiration());
}
}
EventLoopThread
IO线程不一定是主线程,我们可以在任何一个线程创建并运行EventLoop。一个线程也可以有不止一个IO线程,muduo中的main Reactor 和 sub Reactor 都是IO线程。因此我们需要EventLoopThread类。
class EventLoopThread : noncopyable{
public:
EventLoopThread();
~EventLoopThread();
EventLoop* startLoop();
private:
void threadFunc();
EventLoop* loop_;
bool exiting_;
Thread thread_;
MutexLock mutex_;
Condition cond_;
};
其中关键的EventLoopThread::startLoop()定义如下,这个函数会返回新线程中的EventLoop指针
EventLoop* EventLoopThread::startLoop(){
assert(!thread_.started());
thread_.start();
{
MutexLockGuard lock(mutex_);
// 一直等到threadFun在Thread里真正跑起来
while(loop_ == NULL){
cond_.wait();
}
}
return loop_;
}
测试
一个简单的测试程序,建立类几个定时器任务。
#include "EventLoop.h"
#include "Channel.h"
#include "base/CurrentThread.h"
#include "base/Thread.h"
#include <stdio.h>
#include <sys/timerfd.h>
#include <string.h>
#include <functional>
int cnt = 0;
EventLoop* g_loop;
void printTid()
{
printf("pid = %d, tid = %d\n", getpid(), CurrentThread::tid());
}
void print(const char* msg)
{
printf("msg %s\n", msg);
if (++cnt == 20)
{
g_loop->quit();
}
}
int main()
{
printTid();
EventLoop loop;
g_loop = &loop;
print("main");
loop.runAfter(1, std::bind(print, "once1"));
loop.runAfter(1.5, std::bind(print, "once1.5"));
loop.runAfter(2.5, std::bind(print, "once2.5"));
loop.runAfter(3.5, std::bind(print, "once3.5"));
loop.runEvery(2, std::bind(print, "every2"));
loop.runEvery(3, std::bind(print, "every3"));
loop.loop();
print("main loop exits");
sleep(1);
}
代码地址
https://github.com/ZhaoxuWang/simple-muduo/tree/main/stage1