A:SOCKET-IO复用技术
1.五个I/O模型
阻塞I/O
非阻塞I/O
I/O复用(select和poll)
信号驱动I/O
异步I/O
2.阻塞I/O模型
最流行的I/O模型是阻塞I/O模型,缺省时,所有的套接口都是阻塞的

3.非阻塞I/O模型
当我们把一个套接口设置为非阻塞方式时,即通知内核:当请求的I/O操作非得让进程睡眠不能完成时,不要让进程睡眠,而应返回一个错误

应用程序连续不断地查询内核,看看某操作是否准备好,这对cpu时间是极大的浪费,一般只在专门提供某种功能的系统中才会用到
4.I/O复用模型
有了I/O复用,我们就可以调用select或poll,在这两个系统调用的某一个上阻塞,而不是真正阻塞于真正的I/O系统调用

5.信号驱动I/O模型
我们也可以用信号,让内核在描述字准备好时用信号SIGIO通知我们,我们将此方法称为信号驱动I/O

6.异步I/O模型
异步I/O是Posix.1的1993版本中的新内容,我们让内核启动操作,并在整个操作完成后通知我们

7.I/O复用
如果一个或多个I/O条件满足(例如:输入已准备好被读,或者描述字可以承接更多输出的时候)我们就能够被通知到,这样的能力被称为I/O复用,是由函数select和poll支持的
8.I/O复用网络应用场合
当客户处理多个描述字
一个客户同时处理多个套接口
如果一个tcp服务器既要处理监听套接口,又要处理连接套接口
如果一个服务器既要处理TCP,又要处理UDP
9.select函数作用
这个函数允许进程指示内核等待多个事件中的任一个发生,并仅在一个或多个事件发生或经过某指定的时间后才唤醒进程
10.select函数什么情况下返回
作为一个例子,我们可以调用函数select并通知内核仅在下列情况发生时才返回:
集合{1,4,5}中的任何描述子准备好读 或
集合{2,7}中的任何描述字准备好写或
集合{1,4}中的任何描述字有异常条件待处理或
已经过了10.2秒
也就是说,通知内核我们对哪些描述字感兴趣
(读、写或异常条件)以及等待多长时间。
11.select函数
包含头文件<sys/select.h><sys/socket.h>
功能:提供了即时响应多个套接的读写事件
原型:
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *except,const struct timeval *timeout);参数
maxfdp1:等待最大套接字值加1,(等待套接字的数量)
readset:要检查读事件的容器
writeset:要检查写事件的容器
timeout:超时时间
返回值:返回触发套件接字的个数
12.timeval结构
虽然结构timeval为我们指定了一个微秒级的分辨率,但内核支持的分辨率却要粗糙得多。例如,很多UNIX内核将超时值向上舍入成10ms的倍数。另外还有调度延迟现象,即定时器时间到后内核还需花一点时间调度相应进程的运行
struct timeval(
long tv_sec; //秒
long tv_usec;//微秒
);
13.timeout参数
timeout参数有三种可能
永远等待下去:仅在有一个描述字准备好I/O时才返回,为此,我们将timeout设置为空指针
等待固定时间:在有一个描述字准备好I/O是返回,但不超过由timeout参数所指timeval结构中指定的秒数和微秒数
根本不等待:检查描述字后立即返回,这称为轮询。定时器的值必须为0
14.fd_set参数
select使用描述字集,它一般是一个整数数组,每个数中的每一位对应一个描述字。
使用fd_set数据类型来表示这个描述字集,我们不用去关心具体的实现细节。
15.操作fd_set的四个宏
void FD_ZERO(fd_set *fdset); //清空描述字集合
void FD_SET(int fd, fd_set *fdset); //添加一个描述字到集合中
void FD_CLR(int fd, fd_set *fdset); //从集合中删除一个描述字
int FD_ISSET(int fd, fd_set *fdset);//描述字是否在该集合中16.fd_set使用
对集合的初始化是很重要的,如果集合作
为一个自动变量分配而未初始化,那将导致不可预测的后果
• fd_set rset;
• FD_ZERO(&rset);
• FD_SET(1,&rset);
• FD_SET(4,&rset);
• FD_SET(5,&rset);17.select函数的三个中间参数
中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件所需的描述字
如果我们对某个条件不感兴趣,这三个参数中相应的参数就可以设为空指针
18.maxfdp1参数
Maxfdp1指定被测试的描述字个数,它的值是要被测试的最大描述字加1,描述字0、1、2…….一直到maxfdp1均被测试
头文件<sys/select.h>中定义的常值FD_SETSIZE,是数据类 型fd_set的描述字数量,其值通常是1024
19.select函数返回值
当返回时,结果指示哪些描述字已准备好。
返回时,我们用宏FD_ISSET来测试结构fd_set中的描述 字。描述字集中任何与没有准备好的描述字相对应的位返回时清成0。为此,每次调用select时,我们都得将所有描述字集中关心的都置为1
如果在任何描述字准备好之前定时器时间到,则返回0,
返回-1表示有错。
20.套接口准备好读
套接口接收缓冲区中的数据字节数大于等于套接口接收缓冲区低潮限度的当前值
连接的读这一半关闭。读操作将不阻塞且返回0
套接口是一个监听套接口且已完成的连接数为非0
有一个套接口错误待处理,对这样的套接口的读操作将不阻塞且返回一个错误(-1)
21.套接口准备好写
套接口发送缓冲区中的可用空间字节数大于等于套接口发送缓冲区低潮限度的当前值
连接的写这一半关闭,对这样的套接口的写操作将产生信号(SIGPIPE)
有一个套接口错误待处理。对这样的套接口的写操作将不阻塞且返回一个错误(-1)
22.shutdown函数
包含头文件<sys/socket.h>
功能:关闭套接字两端或一端的socket
原型:
int shutdown(int sockfd,int howto)参数
SHUT_RD 关闭连接的读这一半,不再接收套接口中的数据且现留在套接口接收缓冲区中的数据都作废
SHUT_WR 关闭连接的写这一半,在TCP场合下,这称为为半关闭。当前留在套接口发送缓冲区中的数据都被发送,后跟正常的tcp连接终止序列
SHUT_RDWR 连接的读这一半和写这一半都关闭
返回值:成功返回0,失败返回错误代码
23.shutdown与close的区别
终止网络连接的正常方法是调用close,但close有两个限制可由函数shutdown来避免。
close将描述字的访问计数减1,仅在此计数为0时才关闭套接口;用shutdown我们可以激发TCP的正常连接终止序列,而不管访问计数
Close终止了数据传送的两个方向:读和写。由于TCP连接是全双工的,有很多时候我们要通知另一端我们已完成了数据发送,即使一端仍有许多数据要发送也是如此.
24.Poll模型
Poll函数和select类似,但它是用文件描述符而不是条件的类型来组织信息的.
也就是说,一个文件描述符的可能事件都存储在struct pollfd中.与之相反,select用事件的类型来组织信息,而且读,写和错误情况都有独立的描述符掩码.poll函数是POSIX:XSI扩展的一部分,它起源于UNIX System V
25.函数poll原型
包含头文件<poll.h>
功能:与select函数功能相同
原型:
int poll(struct pollfd *fdarray,unsigned long nfds,int timeout);参数
fdarray是一个pollfd的机构体数组用来表示表示文件描述符的监视信息
nfds参数给出了要监视的描述符数目
timeout参数是一个用豪秒表示的时间,是poll在返回前没有接收事件是应等待的时间,如果timeout的值为-1,poll就永远不会超时.如果整数值为32个比特,那么最大超时周期约为30分钟
返回值:准备好描述字的个数,0-超时,1-出错
26.Pollfd结构体
fd是文件描述符值
event和revents是通过代表各种事件的标准符进行逻辑或运算构建而成的
Struct pollfd
{
int fd;
short events; //感兴趣的事件
short revents; //fd上触发的事情
}
27.Poll函数事件标志

28.Epoll
epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式,相对于select来说
,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,
将用户关心的文件描述符的事件存放到内核的一个事件表中,
这样在用户空间和内核空间的copy只需一次。
Linux中提供的epoll相关函数如下:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);1. epoll_create 函数创建一个epoll句柄,参数size表明内核要监听的描述符数量。
调用成功时返回一个epoll句柄描述符,失败时返回-1。
核心数据结构是:1个红黑树和1个链表
2. epoll_ctl 函数注册要监听的事件类型。四个参数解释如下:
epfd 表示epoll句柄
op 表示fd操作类型,有如下3种
EPOLL_CTL_ADD 注册新的fd到epfd中
EPOLL_CTL_MOD 修改已注册的fd的监听事件
EPOLL_CTL_DEL 从epfd中删除一个fd
fd 是要监听的描述符
event 表示要监听的事件
epoll_event 结构体定义如下:struct epoll_event { __uint32_t events; /* Epoll events */ EPOLLIN表示对应的文件描述符可以读 epoll_data_t data; /* User data variable */ }; typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
3. epoll_wait 函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。
epfd 是epoll句柄
events 表示从内核得到的就绪事件集合
maxevents 告诉内核events的大小
timeout 表示等待的超时事件
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
29.C/S结构图

30.实现群聊的简单思路
登录步骤
1.客户端第一次登陆服务器,要向服务器发送登录请求包(用户啊昵称,用户IP)
2服务器收到之后对客户端进行注册(把昵称和ip放进一个数组中增加一个为其产生的子进程编号.
然后把该数字写进共享内存,
最后通过遍历该数组获得每个进程编号发送信号。
每个子进程收到信号之后读取共享内存里面的数字获得昵称和对应IP,然后发送到相应的客户端中
群聊步骤
客户端发送群聊包,服务器子进程收到信息后,往共享内存中写入对方发的聊天信息和对方的昵称以及本进程id
然后跟客户端通讯的子进程发送kill(0,12),那么进程组所有成员都能获得信号,在信号处理函数内去读取共享内存内容
取出来后先比较下自己的进程id和共享内存里的pid是否一致。如果不一致就发送聊天内容
私聊步骤
客户端发送私聊包,包里面包含对方的昵称和ip
子进程收到私聊包后。同样也要把数据写进共享内存。发送信号给进程组。
每个子进程都会收信号,在信号处理函数内部,读取共享内存的时候判断包类型,然后如果是私聊包,再判断自己的进程是否是要私聊的子进程,如果是直接取出转发。如果不是丢弃不管
B:SOCKET封装
1.网络事件
TCP 网络编程最本质的是处理三个半事件:
1.连接的建立,包括服务端接受 (accept) 新连接和客户端成功发起 (connect) 连接。
2.连接的断开,包括主动断开 (close 或 shutdown) 和被动断开 (read 返回 0)。
3.消息到达,文件描述符可读。这是最为重要的一个事件,对它的处理方式决定了网络编程的风格(阻塞还是非阻塞,如何处理分包,应用层的缓冲如何设计等等)。
4.消息发送完毕,这算半个。对于低流量的服务,可以不必关心这个事件;另外,这里“发送完毕”是指将数据写入操作系统的缓冲区,将由 TCP 协议栈负责数据的发送与重传,不代表对方已经收到数据。
2.注册事件
socket_notify_event event_list[] = {OnConnect, OnRead, OnClose};
socket_inetaddr addr = {5555, “192.168.0.188"};
mini_server_start(addr, event_list);
3.结构体与函数指针
typedef struct tcp_connection_t tcp_connection;
typedef struct tcp_server_t tcp_server;
typedef struct socket_inetaddr_t socket_inetaddr;
typedef struct mio_select_t mio_select;
typedef void (*socket_notify_event)(tcp_connection *conn);
// 启动服务器函数指针
typedef int (*server_start)(socket_inetaddr *addr);
// IO模型函数指针
typedef void (*server_loop)(void);
4.地址结构体
// 地址结构体
struct socket_inetaddr_t
{
unsigned short port;
char ip[16];
};
5.连接结构体
// 连接结构体
struct tcp_connection_t
{
int conn;
socket_inetaddr peeraddr;
};
6.select IO模型结构体
// select IO模型结构体
struct mio_select_t
{
fd_set rset;
fd_set tmpset;
server_loop start;
};
7.tcp服务结构体
struct tcp_server_t
{
int listenfd; //监听套接口
tcp_connection clients[FD_SETSIZE];//保存与客户端的连接数组
mio_select* io; //io模型,根据需要可以用poll,或者epoll替换
server_start start; //启动函数指针
socket_notify_event connect_event; //连接事件函数指针
socket_notify_event read_event; //可读事件函数指针
socket_notify_event close_event; //断开事件函数指针
};
// 对外开放的接口函数
int mini_server_start(socket_inetaddr addr, socket_notify_event* event_list);8.地址类
class CHostAddress
{
public:
// CHostAddress();
CHostAddress(const char *ip=127.0.0.1, unsigned short port=8800);
~CHostAddress();
void SetIp(const char *ip);
const char* GetIp();
void SetPort(unsigned short port);
const unsigned short GetPort();
struct sockaddr * Address();
struct sockaddr_in * InAddress();
int Length();
socklen_t* LengthPtr();
private:
struct sockaddr_in m_addr;
int m_length;
};
9.I/O类
class CBaseStream
{
public:
CBaseStream();
CBaseStream(int fd);
~CBaseStream(void);
void SetFd(int fd);
int GetFd() const;
int Read(char *buf, const int buf_len);
int Read(char *buf, const int buf_len, struct timeval *tv/*int timeout == -1*/);
int Read(char *buf, const int buf_len, CHostAddress &remote_addr);
int Read(char *buf, const int buf_len, CHostAddress &remote_addr, struct timeval *tv);
int Write(const char *buf, const int buf_len);
int Write(const char *buf, const int buf_len, struct timeval *tv);
int Write(const char *buf, const int buf_len, CHostAddress remote_addr);
int Write(const char *buf, const int buf_len, CHostAddress remote_addr, struct timeval *tv);
private:
int m_fd;
};10.socket基类
class CBaseSocket
{
public:
CBaseSocket();
CBaseSocket(int type, int sin_farmly = AF_INET, int protocol = 0);
~CBaseSocket();
int GetSocket() const { return m_fd; }
void SetSocket(int fd);
CHostAddress GetAddress();
bool Bind(CHostAddress addr);
bool Close();
protected:
void Create(int sin_farmly, int flag, int protocl);
int m_fd;
CBaseStream m_stream;
};
11.TCP服务器类
class CTcpServer : public CBaseSocket
{
public:
CTcpServer(int type,int domain=PF_INET,int protocol=0);
~CTcpServer();
bool Listen(int backlog);
CTcpSocket Accept();
private:
void Create();
};12.TCP客户端类
class CTcpSocket :
public CBaseSocket
{
public:
CTcpSocket(void);
CTcpSocket(const CTcpSocket &other);
CTcpSocket(int fd);
~CTcpSocket(void);
CTcpSocket & operator= (const CTcpSocket &other);
CHostAddress GetRemoteAddr() const;
void SetRemoteAddr(const CHostAddress remote_addr);
bool Connect(CHostAddress addr);
int Read(char *buf, const int buf_len);
int Read(char *buf, const int buf_len, struct timeval *tv);
int Write(char *buf, const int buf_len);
int Write(char *buf, const int buf_len, struct timeval *tv);
protected:
void Create();
CHostAddress m_remote_addr;
};
13.UDP类
class CUdpSocket : public CBaseSocket
{
public:
CUdpSocket(void);
CUdpSocket(int fd);
~CUdpSocket(void);
int Read(char *buf, const int buf_len, CHostAddress &remote_addr);
int Read(char *buf, const int buf_len, CHostAddress &remote_addr, struct timeval *tv);
int Write(char *buf, const int buf_len, CHostAddress remote_addr);
int Write(char *buf, const int buf_len, CHostAddress remote_addr, struct timeval *tv);
private:
void Create();
};
C:线程池封装
1.线程池基本概念
线程池是预先创建线程的一种技术。线程池在任务还没有到来之前,创建一定数量(N)的线程,放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。
当新任务到来时,缓冲池选择一个空闲线程,把任务传入此线程中运行;如果缓冲池已经没有空闲线程,则新建若干个线程。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。
2.线程池组成部分
线程池类
维护工作者线程队列(包括空闲与忙碌队列)
维护一个任务队列
维护一个线程池调度器指针
线程池调度器(本身也是一个线程)
负责线程调度
负责任务分配
工作者线程类(线程池中的线程类的封装)
任务队列
任务接口(实际的业务逻辑都继承自该接口)
3.线程池工作原理
线程池类至少提供三个接口,初始化线程池、销毁线程池、添加任务接口
初始化线程池
开启线程池调度器线程
预先创建N个线程(由线程调度池器类负责创建线工作者线程),放入空闲线程队列
指定最大的忙碌状态的线程数
销毁线程池
释放空闲队列中的线程与工作状态中的线程
释放调度器线程
添加任务
添加一实际任务,但是并没有立刻运行该任务,只是放入任务队列,由线程池调度器从任务队列获取该任务,并从线程池中获得一个线程来运行该任务,这里实际上是一种生产者消费者模型。
线程池调度器包含创建空闲线程、销毁空闲线程接口
线程池调度器本身也是一个线程,主要负责任务调度与线程调度,其工作过程大致如下:
从任务队列获取任务,如果队列为空,阻塞等待新任务到来
队列不为空,取出该任务,从空闲线程队列取一线程,如果为空,判断工作者线程数是否达到上限,如果没有,则创建若个空闲线程,否则等待某一任务执行完毕,并且该任务对应的线程归还给线程池
获得空闲工作者线程,将任务交给工作者线程来处理,工作者线程维护一任务指针,这里只要该指针指向任务,并且唤醒线程
判断空闲工作者线程数是否超过最大工作者线程数,如果超过,销毁(空闲线程数-允许最大空闲线程数)个线程
任务接口是一个抽象类,只有一个虚函数run方法,执行的是实际的业务逻辑
工作者线程维护一任务指针,工作者线程的任务主要是运行任务对象的run方法。
当线程池调度器调度一个工作者线程后,就唤醒工作者线程,并调用run方法来执行实际的业务逻辑,当run方法执行完毕,即业务逻辑处理结束,将工作者线程归还到空闲线程池队列,而不是销毁。这样线程池调度器下一次就有机会调度到该工作者线程。
4.线程池设计注意点
任务的通用性
线程创建和销毁策略
任务分配策略
5.任务的通用性
不同的业务解决方案有各自独特的任务处理方法,任务的划分上也就千差万别。为了使得在处理任务对象的时候达到一定程度的通用性,任务对象的设计上必须与实际任务的处理逻辑完全无关。从任务执行的角度看,任务不过是处理流程的一次或者多次执行的过程,可以这样来定义如下任务接口
class ITask
{
virtual bool run() = 0;
}
任务在其需要的时候才创建。任务的创建通过new操作,动态创建具体的任务对象,然后传入线程池,由线程池自动分配线程来执行此任务。
任务是否执行完毕由其自身来决定。一个未知任务什么时候执行完毕是不可能预测的,必须任务本身来决定。这个策略通过,run()的返回值来实现。当工作线程执行一次任务时,如果返回值为true,表示任务执行完毕,就用delete操作销毁此任务;如果返回值为false,表示任务需要执行的工作并未完成,继续执行此任务。
这样的策略,使得在设计新的任务处理流程的时候,不需要过多的关心任务的接口规范,只需要在新任务类的构造函数中初始化各种资源,在新任务类的析构函数中回收资源,在run()方法中实现主要的处理逻辑,那么新的任务类即可在线程池中执行。
6.线程创建和销毁策略
在缓冲池刚刚建立时,线程池中有一定数量(N1)的已创建好的线程,这样可以使得新任务可以及时的得到执行。估计出平均情况下,一次业务产生的任务数量N2。那么N1应该是N2的整数倍,N1=N2×n1
在线程缓冲池中的所有线程都处于繁忙状态的时候,线程池就会创建新的线程,设创建N2个。由以上分析,为了减少由于线程不够而再创建线程的概率,N3也应该是N2的整数倍,N3=N2×n2。
当服务器业务减少,出现大量线程闲置的情况,就应该销毁一部分线程。很显然,这里应该使用超时策略,当某些线程在超过时间T仍然处于闲置状态,就销毁一部分空闲线程。设销毁N4个空闲线程,为了减少由于线程不够而再创建线程的概率,N4也应该是N2的整数倍,N4=N2×n3。当然,为了使得新任务及时得到处理,即使服务器一直处于空闲,也应该保留N1个线程。
7.任务分配策略
在业务处理中,会有各种各样的任务对象,这些业务对象对系统资源的使用也不同。这些任务,无论其空间复杂度如何,从线程执行任务这一角度来看,应该关心的主要是时间复杂度。
线程缓冲池在接收到新任务的时候,首先要寻找空闲线程,传入新任务,然后执行任务,最后还要删除任务,置空闲线程的标志。寻找空闲线程、传入任务、最后的清理工作,这些都是为了执行任务而产生的额外开销,如果所执行的任务大多数都是轻量级任务,那么额外开销带来的资源浪费就显得很突出了。为了解决这个问题,可以给一个线程传入N5个轻量级任务,这一个线程依次执行N5个轻量级任务,由于都是在很短时间内完成,并不影响任务响应的及时性。显然,N5≥1。
