文章目录
1.解决问题
在 linux 中,默认情况下所有的 socket 都是 blocking,一个典型的读操作流程
read函数的过程
当用户进程调用了 read 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据。
对于 network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的数据包)这个时候 kernel
就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。
当 kernel 一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果, 用户进程才解除 block 的状态,重新运行起来。关于blocking IO的特点
就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被 block 了。
几乎所有的程序员第一次接触到的网络编程都是从 listen()、send()、recv() 等接口开始的,这些接口都是阻塞型的。
使用这些接口可以很方便的构建服务器/客户机的模型。
下面是一 个简单地“一问一答”的服务器。
大部分的socket 接口都是阻塞型的。
所谓阻塞型接口是指系统调用(一般是 IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回 实际上,除非特别指定,几乎所有的 IO 接口 (包括 socket 接口 ) 都是阻塞型的。
这给网络编程带来了一个很大的问题,如在调用send()的同时,线程将被阻塞,在此期间,线程 将无法执行任何运算或响应任何的网络请求。改进方案1
一个简单的改进方案是在服务器端使用多线程或多进程
目的是让每个连接都拥有独立的线程或进程,这样任何一个连接的阻塞都不会影响其他的连接。
具体使用多进程还是多线程,并没有一个特定的模式。
传统意义上,进程的开销要远远大于线程,所以如果需要同时为较多的客户机提供服务,则不推荐使用多进程。
如果单个服务执行体需要消耗较多的CPU资源,譬如需要进行大规模或长时间的数据运算或文件访问则进程较为安全。
通常使用 pthread_create()创建新线程或者fork()创建新进程。
我们假设对上述的服务器 / 客户机模型,提出更高的要求,即让服务器同时为多个客户机提供一问一答的服务。
于是有了如下的模型。
改进方案2
c. I/O复用(select和poll)
其中IO复用模型就是select一种典型用法
IO多路复用,即当有多个IO需要检测是否可以进行读写时,可以通过Select检测,不需要阻塞在等待数据
其中Unix下共有五种I/O模型
a. 阻塞I/O
b. 非阻塞I/O
c. I/O复用(select和poll)
d. 信号驱动I/O(SIGIO)
e. 异步I/O(Posix.1的aio_系列函数)
2.函数使用说明
//头文件
#include <sys/time.h>
#include <unistd.h>
/*
参数一:需要监控的最大的文件描述符数量加1。
参数二:用于检查可读性,
参数三:用于检查可写性,
参数四:用于检查带外数据,
参数五:一个指向timeval结构的指针,用于决定select等待I/o的最长时间。
对于select函数的功能简单的说就是对文件fd做一个测试。测试结果有三种可能:
1.timeout=NULL(阻塞:select将一直被阻塞,直到某个文件描述符上发生了事件)
2.timeout所指向的结构设为非零时间(等待固定时间:如果在指定的时间段里有事件发生或者时间耗尽,函数均返回)
3.timeout所指向的结构,时间设为0(非阻塞:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生)
timeval结构的定义:
struct timeval{
long tv_sec; // seconds
long tv_usec; // microseconds
}
函数返回值
>0:就绪描述字的正数目
-1:出错
0 :超时
*/
int select (int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
细节补充
参数一:nfds最大文件描述符 默认1024
# 查看进程的文件描述符上限
ulimit -n
# 修改进程的文件描述符上限为2048,临时修改,只对当前shell有效
ulimit -SHn 2048
# 永久修改:编辑/etc/security/limits.conf
vi /etc/security/limits.conf
* hard nofile 65536
* soft nofile 65536
除了每个进程的文件描述符有上限以外,系统还有一个总的上限,默认是100262
# 查看系统的文件描述符上限,方法一
sysctl -a | grep file-max
# 查看系统的文件描述符上限,方法二
cat /proc/sys/fs/file-max
# 修改系统的文件描述符上限
sysctl -w fs.file-max=2048
# 使修改生效
sysctl -p
对于fd_set类型通过下面四个宏来操作:
FD_ZERO(fd_set *fdset) 将指定的文件描述符集清空,在对文件描述符集合进行设置前,必须对其进行初始化。如果不清空,由于在系统分配内存空间后,通常并不作清空处理,所以结果是不可知的。
FD_SET(fd_set *fdset) 用于在文件描述符集合中增加一个新的文件描述符。
FD_CLR(fd_set *fdset) 用于在文件描述符集合中删除一个文件描述符。
FD_ISSET(int fd,fd_set *fdset) 用于测试指定的文件描述符是否在该集合中
服务端实例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/poll.h>
#define BUFFER_LENGTH 1024
#define POLL_SIZE 1024
#define EPOLL_SIZE 1024
void* client_callback(void *arg) {
int clientfd = *(int*)arg;
while (1) {
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data\n");
}
//close(clientfd);
return NULL;
} else if (ret == 0) {
printf("disconnect \n");
//close(clientfd);
return NULL;
} else {
printf("Recv:%s, %d Bytes\n", buffer, ret);
//return NULL;
}
}
}
int main(int argc, char *argv[]) {
if (argc < 2) {
printf("Paramter Error\n");
return -1;
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket");
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("bind");
return 2;
}
if (listen(sockfd, 5) < 0) {
perror("listen");
return 3;
}
#if 0
while (1) {
//c10k
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) continue;
pthread_t thread_id;
int ret = pthread_create(&thread_id, NULL, client_callback, &clientfd);
if (ret < 0) {
perror("pthread_create");
exit(1);
}
}
#elif 1
//select方法
fd_set rfds, rset;
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds);//添加监控accpet事件
int max_fd = sockfd;
int i = 0;
while (1) {
rset = rfds;//rfds是一个清空好的fd_set
int nready = select(max_fd+1, &rset, NULL, NULL, NULL);
if (nready < 0) {
printf("select error : %d\n", errno);
continue;
}
//有新连接过来 sockfd是一个accept类型
if (FD_ISSET(sockfd, &rset)) {
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0)
continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port), sockfd, clientfd);
if (max_fd == FD_SETSIZE) {
printf("clientfd --> out range\n");
break;
}
//添加新连接到rfds集合
FD_SET(clientfd, &rfds);
if (clientfd > max_fd) max_fd = clientfd;
printf("sockfd:%d, max_fd:%d, clientfd:%d\n", sockfd, max_fd, clientfd);
if (--nready == 0) continue;
}
for (i = sockfd + 1;i <= max_fd;i ++) {
if (FD_ISSET(i, &rset)) {
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(i, buffer, BUFFER_LENGTH, 0);
if (ret < 0) {
//返回值<0时并且(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)的情况下认为连接是正常的,继续接收。
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data");
}
FD_CLR(i, &rfds);
close(i);
} else if (ret == 0) {
//连接断开 需要从rfds集合删掉该文件描述符 并且关闭
printf(" disconnect %d\n", i);
FD_CLR(i, &rfds);
close(i);
break;
} else {
//正常收到数据
printf("Recv: %s, %d Bytes\n", buffer, ret);
//业务代码
}
if (--nready == 0) break;
}
}
}
#endif
return 0;
}