讨论QQ群:135202158
线程池如上一篇随笔(http://www.cnblogs.com/zzqcn/p/3585003.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。
要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:
含N个线程的线程组
工作队列
工作线程例程
线程组和工作队列表示如下:
/** Threads:
*
* +----------+----------+------+------------+
* | thread 0 | thread 1 | .... | thread n-1 |
* +----------+----------+------+------------+
*
* Job Queue:
*
* back front
* | |
* v v
* +-------+ +-------+ +-------+
* | job 0 | -> | job 1 | -> ... -> | job x |
* +-------+ +-------+ +-------+
**/
线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。
工作线程例程(线程函数)的大致执行流程如下图所示:
/**
* Each Thread Routine:
* Job-Queue
* | ...
* v |
* +-------+ +---------+ EnQueue
* +---> | sleep | (No job) | new job |
* | +-------+ +---------+
* | | |
* | | DeQueue +---------+
* | +
* | | +---------+
* | v
* | +---------+
* | | do work |
* | +---------+
* | |
* | |
* +----
**/
工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。
另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。
下面给出完整代码。
1. threadpool.h
1 /*
2 * Linux线程池的简单实现.3 * Author: 赵子清4 * Blog:http://www.cnblogs.com/zzqcn
5 *6 **/
7
8
9
10 #ifndef __THREADPOOL_H__11 #define __THREADPOOL_H__
12
13
14 #include
15 #include
16 #include
17
18
19
20 #define DLPTP_MAX_THREADS 1024
21
22
23 structtp_job_t24 {25 void (*work) (void*);26 void*arg;27 };28
29 structtp_threadpool_t30 {31 pthread_t*threads;32 size_t nthreads;33 std::queuejobs;34 sem_t njobs;35 pthread_mutex_t lock;36 boolrunning;37 };38
39
40 tp_threadpool_t*tp_init(size_t _nthreads);41 int tp_deinit(tp_threadpool_t*_ptp);42 void* tp_worker(void*_ptp);43 int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void*_arg);44
45
46 #endif
47
2. threadpool.cpp
1 /*
2 * Linux线程池的简单实现.3 * Author: 赵子清4 * Blog:http://www.cnblogs.com/zzqcn
5 *6 **/
7
8
9
10 #include "threadpool.h"
11
12
13
14 tp_threadpool_t*tp_init(size_t _nthreads)15 {16 if(_nthreads < 1 || _nthreads >DLPTP_MAX_THREADS)17 returnNULL;18
19 int err = 0;20 tp_threadpool_t* ret =NULL;21 size_t i, j;22
23 ret = newtp_threadpool_t;24 if(NULL ==ret)25 returnNULL;26 ret->nthreads =_nthreads;27 ret->threads = newpthread_t[_nthreads];28 if(NULL == ret->threads)29 {30 delete ret;31 returnNULL;32 }33 ret->running = true;34
35 err = sem_init(&ret->njobs, 0, 0);36 if(-1 ==err)37 {38 delete[] ret->threads;39 delete ret;40 returnNULL;41 }42
43 err = pthread_mutex_init(&ret->lock, NULL);44 if(err)45 {46 sem_destroy(&ret->njobs);47 delete[] ret->threads;48 delete ret;49 returnNULL;50 }51
52 for(i=0; i<_nthreads err="pthread_create(&ret-">threads[i], NULL, tp_worker, (void*)ret);55 if(err)56 {57 ret->running = false;58 for(j=0; jthreads[j]);61 pthread_join(ret->threads[j], NULL);62 }63 pthread_mutex_destroy(&ret->lock);64 sem_destroy(&ret->njobs);65 delete[] ret->threads;66 delete ret;67 returnNULL;68 }69 }70
71 returnret;72 }73
74
75 int tp_deinit(tp_threadpool_t*_ptp)76 {77 if(NULL ==_ptp)78 return -1;79
80 int err = 0;81 size_t i, j;82
83 //TODO: if now worker has job to handle, do something then exit
84 while(!_ptp->jobs.empty());85
86 _ptp->running = false;87
88 for(i=0; i<_ptp->nthreads; ++i)89 {90 err = sem_post(&_ptp->njobs); /*V, ++*/
91 if(err)92 {93 for(j=i; j<_ptp->nthreads; ++j)94 pthread_cancel(_ptp->threads[j]);95 break;96 }97 }98
99 for(i=0; i<_ptp->nthreads; ++i)100 pthread_join(_ptp->threads[i], NULL);101
102 pthread_mutex_destroy(&_ptp->lock);103 sem_destroy(&_ptp->njobs);104
105 delete[] _ptp->threads; _ptp->threads =NULL;106 delete _ptp; _ptp =NULL;107
108 return 0;109 }110
111
112 void* tp_worker(void*_ptp)113 {114 if(NULL ==_ptp)115 returnNULL;116
117 tp_threadpool_t* p = (tp_threadpool_t*)_ptp;118
119 while(p->running)120 {121 sem_wait(&p->njobs); /*P, --*/
122
123 if(!p->running)124 returnNULL;125
126 void (*work) (void*);127 void*arg;128 tp_job_t job;129
130 pthread_mutex_lock(&p->lock); /*LOCK*/
131
132 job = p->jobs.front();133 work =job.work;134 arg =job.arg;135 p->jobs.pop();136
137 pthread_mutex_unlock(&p->lock); /*UNLOCK*/
138
139 work(arg);140 }141
142 returnNULL;143 }144
145
146 int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void*_arg)147 {148 if(NULL == _ptp || NULL ==_work)149 return -1;150
151 tp_job_t job;152 job.work =_work;153 job.arg =_arg;154
155 pthread_mutex_lock(&_ptp->lock); /*LOCK*/
156 _ptp->jobs.push(job);157 sem_post(&_ptp->njobs); /*V, ++*/
158 pthread_mutex_unlock(&_ptp->lock); /*UNLOCK*/
159
160 return 0;161 }
3. 测试程序main.cpp
1 /*
2 * Linux线程池测试.3 * Author: 赵子清4 * Blog:http://www.cnblogs.com/zzqcn
5 *6 **/
7
8 #include
9 #include
10 #include "threadpool.h"
11
12
13 /*task 1*/
14 void task1(void*_arg)15 {16 printf("# Thread working: %u\n", (int)pthread_self());17 printf("Task 1 running..\n");18 usleep(5000);19 }20
21
22 /*task 2*/
23 void task2(void*_arg)24 {25 printf("# Thread working: %u\n", (int)pthread_self());26 printf("Task 2 running..");27 printf("%d\n", *((int*)_arg));28 usleep(5000);29 }30
31
32 #define N_THREADS 4
33
34 int main(int argc, char**argv)35 {36 tp_threadpool_t* ptp =NULL;37 inti;38
39 ptp =tp_init(N_THREADS);40 if(NULL ==ptp)41 {42 fprintf(stderr, "tp_init fail\n");43 return -1;44 }45
46 int a = 32;47 for(i=0; i<10; ++i)48 {49 tp_add_job(ptp, task1, NULL);50 tp_add_job(ptp, task2, (void*)&a);51 }52
53 tp_deinit(ptp);54
55 return 0;56 }