手写C++muduo库的梳理总结
mainReactor就是我们代码中的mainloop(baseloop),subReactor是我们代码中的工作线程进行读写事件的处理,read是底层做的,decode是我们自己做的。compute和encode也是我们自己做的。从网络上read数据,然后从网络上send数据,这些是由muduo库帮我们做的。decode compute encode是我们用户在OnMessage处理的业务逻辑。每一个线程都是一个独立的Reactor
我们来梳理一下
Channel
我们看看它的成员变量:
channel主要做的事情是:
封装了fd,events和revents,还有一组回调方法
fd就是表示要往poller上注册的文件描述符
events就是事先设置的fd所感兴趣的事件(读事件或者写事件)
revents就是poller最终给我们channel通知的这个fd上发生的事件,channel根据相应的发生的事件来执行相应的回调。
对于上层来说,如果有一个fd的话,它就会把fd打包成channel通道,然后下发到poller上。
我们来看看poller的头文件:
poller有一个成员变量channels,是一个map表,键就是channel打包的sockfd,值就是包含fd对应的channel,也就是说如果poller检测到哪个fd有事件发生了,就可以通过发生事件的fd和这个map表,找到这个fd对应的channel,这个channel里面就记录着详细的对应事件的回调方法。
但是channel和poller是独立的,它们是不能直接互相通信的,它们都是依赖eventloop来通信的。
不管是channel还是poller,都有一个成员变量记录它所在的事件循环(loop成员变量)。
muduo库不管监听的listenfd还是accept返回,客户端连接成功返回跟客户端专门通信的connectionfd,都会把这个fd打包成channel,给它写入fd感兴趣的事件(读或者写事件),然后注册到相应的loop的poller上去。
poller,我们在项目中实现的主要就是epollpoller,底层就是epoll,意味着底层就是通过epoll_ctrl把相应的channel里面包含的fd注册到epoll上,当epoll返回以后,因为poller上有map表channels,通过sockfd找到它所对应的channel来调用相应的回调。
channelcallbacks回调是上层设置的。
我们总共就有2种channel,因为就2种fd:listenfd(依赖于accptor)和已建立连接的客户端专门通信用的connectionfd,
如下图表示:channel在调用相应的回调,比如说是listenfd发生的事件,那么它所调用的回调函数就是acceptor塞给它的,如果是已建立连接的客户端专门通信用的connectionfd,肯定就是TCPconnection塞给它的。channel和poller之间不能互相访问。
它们2个都是依赖eventloop事件循环的
eventloop相当于是reactor
poller和epollpoller相当于是多路事件分发器
EventLoop
我们看到,它有一个activeChannels_,这是一个vector,包含了所有的channel,因为上层交给反应堆的不只是单纯的fd,因为反应堆获取fd,还要执行相应的回调,所以塞给反应堆的都是channel,我们不要忘记,它还有weakupfd!
wakeupfd也是被封装成wakeupchannel_,主要作用是一个wakeupfd,是隶属于一个loop,也就是说,每个loop都有一个wakeupfd,因为loop最后执行触发底层的事件分发器,是epoll_wait,只要没有事件发生,相应的loop线程阻塞在epoll_wait上,如果我们想唤醒loop所在线程的阻塞状态,我们直接通过loop对象获取其wakeupfd,直接在这个wakeupfd上写个东西,相应的loop就会被唤醒。因为每个loop的wakeupfd也被封装成一个channel,注册在自己loop底层的epoll事件分发器上。
从大体上看,eventloop管理的就是一堆channel和一个poller,和自己的wakeupfd
这样一来,channel想把自己注册到poller上,或者是在poller上修改自己感兴趣的事件,channel都是通过eventloop获取poller的对象,来向poller设置,同样的,如果poller监测到有socketfd有相应的事件发生,就通过eventloop调用channel相应的fd所发生事件的回调函数。
我们看到,eventloop上有一个vector(pendingFunctors),存了一堆的回调函数。
因为这些回调,每一个回调在执行的时候都应该是在loop自己所在的线程执行,如果说当前线程调用了loop,让这个loop执行回调,在程序逻辑上会进行个判断,如果当前线程就是这个loop那么直接执行回调,否则的话,就把它存到pendingFunctors这个vector里面,然后唤醒相应的loop,然后在vector里拿相应的回调执行。
EventThreadPool
Thread是底层的线程。
EventloopThread是事件的线程
EventloopThreadPool是事件循环线程池
EventloopThreadPool里面有一个方法:getNextloop方法,默认是通过轮询算法获取下一个subloop,如果客户没有提供setthreadnumber设置线程,那么就是没有创建subloop,getNextloop永远返回的是baseloop
当我们通过setthreadnumber设置底层的线程数量,EventloopThreadPool就会驱动底层开始创建线程,一个线程一个loop,就是one loop per thread
EventloopThreadPool的内容如下
EventloopThread的内容如下
包含了底层的线程模型thread
下面这个方法是启动新线程,新线程的执行函数:
初始化好之后,开始epoll_wait(loop.loop)
Acceptor
Acceptor封装了listenfd相关的操作,比如说:创建listenfd,绑定bind,listen,listen成功后,把listenfd打包成acceptorchannel扔给baseloop来监听它的事件。
Buffer是缓冲区,对于nonblocking(非阻塞)的I/O,我们都需要设置缓冲区,涉及到应用写数据写到缓冲区,再写到TCP的发送缓冲区,然后send数据
因为TCP的发送缓冲区,写满了就要发送,这是同步的过程,效率比较慢。
如果有缓冲区的话,我们用户写数据,直接向缓冲区进行写就可以了。
缓冲区里有prependable,readeridx,writeridx,这3个下标
prependable有8个字节,是头部信息,刚开始readeridx和writeridx都在这8个字节处的位置,然后我们写数据进去,写好的东西就可以是待发送的东西,由Buffer专门进行在缓冲区读取数据进行发送,我们从缓冲区可以进行send发送,用户也可以不断从缓冲区写数据,通过readeridx和writeridx处理
模仿了Java的neti网络库channelbuffer
应用产生数据快,tcp发送的慢,难道每一次发送数据都要等tcp发送完上一次的数据再发送当前数据?写数据到buffer,然后让系统去调用write,或者系统调用read,把数据读到buffer上。相当于应用发送或者接收数据和tcp真实发送或者接收数据就成异步的了,这就非常高效了!!!
TcpConnection
一个连接成功的客户端对应一个TcpConnection,
这个TcpConnection包含的东西有哪些?
封装了socket,channel,和各种回调,高水位线的控制(不要发送过快),发送和接收缓冲区。
对于connectionfd在channel上执行的回调,都是由TcpConnection来设置的
TcpServer就是总领所有
TcpServer的成员:
Acceptor(得到新用户,才能通过EventLoopThreadpool的getnextloop,把新用户封装tcpconnection设置各种各样的回调以后,才能选择一个subloop,把新的connection扔给相应的loop),EventLoopThreadpool,一堆callback,connectionmap
流程叙述
首先,我们是怎么使用的?
1.定义一个loop,就是baseloop
2.InetAddress打包了IP地址和端口号
3.创建server对象,给到EchoServer的构造函数,通过其参数进行初始化底层的TCPserver对象,保留了loop
4.设置了2个回调
5.设置了底层的loop线程的数量(这个数量不包括baseloop)
很好的把网络代码和业务代码分离了,我们只需要做开发者需要关注的:
6.我们用户作为开发者只需要关注下面的2个方法:
响应连接的建立和断开
响应读写事件
设置完之后
7.server.start();启动loop
loop.loop()
总结而来就是:首先构建tcpserver对象,设置回调方法,然后设置底层的loop线程的数量,然后调用start方法,最后开启主线程的loop
我们看看TcpServer的构造都做了什么?
创建了Acceptor,EventLoopThreadPool(还没有开启loop线程)
首先Acceptor创建了一个非阻塞的listenfd,然后把它打包成1个channel,准备往mainloop的poller上扔。
然后设置了一些tcp选项,创建socket,绑定bind,给channel设置回调,就是readcallback
也就是说:Acceptchannel只关心读事件。
只关心acceptchannel有新用户的连接的事件,底层的channel会去执行readcallback,对于Acceptor的readcallback执行的是handleRead
我们回过来继续看tcpserver的构造函数,接下来创建了EventLoopThreadPool(还没有开启loop线程),
然后执行了setNewConnectionCallback
我们刚才看到acceptor的构造函数,
绑定了一个setReadCallback,也就是说,有新用户的连接,底层的channel回去执行readcallback,因为有新用户的连接,响应了epollin事件,然后就执行readcallback
而对应accptor执行的setreadcallback,执行的是handleread
我们进去看看handleread
这个里面的回调newConnectionCallback是谁设置的?
这个setNewconnectionCallback是谁设置的?
是最上层TcpServer设置的!
注册在mainloop上的acceptor,当有新用户连接,成功以后,经过上面一串,最终执行的是TcpServer的newConnection方法。
我们继续看TcpServer
我们看看start方法
通过atomic变量started控制一下
我们最开始初始化为0,只能取启动1次,我们看第一句代码,threadpool启动底层线程池,创建loop子线程,并且开启loop的loop()
子线程创建执行的就是
我们设置的线程的回调方法就是
59行马上就启动loop()了
启动的时候,为了能把处于睡眠中的subloop唤醒,每个loop都有一个wakeupfd注册在相应loop的子线程的poller上。
接着我们看start的第二句
执行acceptor的listen方法,把acceptorchannel注册在baseloop上的poller上。
这一切就都准备好了
main函数最后一句开启baseloop的loop
我们在调用的三步骤:
1、构建TcpServer对象,同时包含了注册回调,设置底层线程的个数,和扩展。
2、start开启loop子线程,注册wakeupfd,能够让主线程mainloop来唤醒子线程loop。然后acceptor listen,把listenfd打包成一个acceptorchannel注册到baseloop上。
3、最后启动loop();
我们看看acceptor的listen
muduo库:mainloop和subloop之间并没有同步队列,并没有生产者消费者模型,mainloop生成连接,放到同步队列里面,subloop从同步队列去自己取连接,做成异步的生成连接和消费连接,并没有。
而是用系统调用eventfd,创建wakeupfd,可以用它直接做线程间的notify,就是通知,唤醒,效率是非常非常高的。
在libevent上用的是sockpair这个系统调用,创建的是基于Unix本地通信的双向通信的套接字。
我们继续看,现在有一个新的连接了,mainloop就要返回了,执行acceptorchannel的readcallback,最终调用的是TCPserver的newconnection
然后通过轮询算法选择一个subloop,让这个ioloop指针指向,
接着创建1个tcpconnection对象
然后设置了一些回调,这些回调将来由TcpConnection设置到底层的channel里面。
主要的是设置了
当fd被关闭的时候,最终回调到TcpServer 的removeconnection,最终执行下面这个方法
这个ioLoop是肯定是在当前的主线程里去执行的,但是主线程访问ioLoop访问的肯定是一个子线程对应的loop,所以这里的ioLoop->runInLoop肯定不是在主线程里面执行的,除非是没有设置子线程。
我们看看runInLoop函数
queueInloop就是:每一个eventloop都有一个成员变量:wakeupfd,现在要在一个subloop里面执行下面这个方法:
通过这个指针找到subloop的wakeupfd,往wakeupfd上写数据,把相应的subloop唤醒,唤醒以后就去执行这个connectionEstablished,
我们进入connectEstablished
设置了状态,一个connection肯定是包含有一个channel,因为一个connection表示一个connectfd,就会打包成1个channel。
用channel绑定了当前 的TcpConnection
因为只有channel才能收到poller给它通知的事件回调,它执行的事件回调都是TcpConnection设置给它的,如果TcpConnection由于一些原因这个对象TcpConnection没有了,那channel到时候还执行不执行回调?
所以,channel在这里使用了 weakptr弱智能指针来记录了这个TcpConnection对象,到时候通过弱智能指针的提升来监测TcpConnection是否存活,存在就执行相应的回调,不存在就不执行了。
然后channel调用enableReading函数,向相应的poller注册channel,唤醒后把当前TcpConnectionchannel注册在它选择的某个subloop上了,
在主函数调用中,新连接来了,就是
连接成功,有数据通信怎么办?
对于新连接来说,都是注册了socketfd的epollIn事件,如果有相应的可读事件到来,那么相应的loop线程的poller就会返回,返回以后就执行相应的channel的readcallback事件,
通过的是
执行的就是Tcpconnection的handleread
handleread开始读数据,数据读上来
调用messagecallback
相应的响应就是给上层应用报上来,回调给用户。
通过buf->retrieveALLAsString拿到原始字符串,反序列化得到json或者pbbuff,然后业务处理,然后响应send。
所以说,相应的channel有可读事件发生,subloop就会回调channel的readcallback,tcpconnection给相应的channel的readcallback绑定的就是tcpconnection的handleread,handleread通过inputbuffer读取相应的socketfd上的数据,读取完,回调用户设置的messagecallback
然后用户这里就响应了
接着我们来看连接的关闭
如果有异常,对端关闭了或者是当前服务端在这里主动调用shutdown,底层的channel都会响应closecallback
这个closecallback,TCPconnection设置的是哪个回调?
handleclose把对应的channel和感兴趣的事件从poller上删除。然后执行用户的回调。
对于使用者来说,onconnection又发生了回调,
执行tcpconnection的
这次是断开连接的回调
执行TCPserver的
就调用到
在TCPserver里,把这个connection从connectionsmap删除掉,然后getloop,获取这条连接所在的子loop,执行connectDestroyed,
已经设置成kDisconnected了