linux开源线程池库,线程池(Linux实现)

讨论QQ群:135202158

线程池如上一篇随笔(http://www.cnblogs.com/zzqcn/p/3585003.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。

要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:

含N个线程的线程组

工作队列

工作线程例程

线程组和工作队列表示如下:

/** Threads:

*

* +----------+----------+------+------------+

* | thread 0 | thread 1 | .... | thread n-1 |

* +----------+----------+------+------------+

*

* Job Queue:

*

* back front

* | |

* v v

* +-------+ +-------+ +-------+

* | job 0 | -> | job 1 | -> ... -> | job x |

* +-------+ +-------+ +-------+

**/

线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。

工作线程例程(线程函数)的大致执行流程如下图所示:

/**

* Each Thread Routine:

* Job-Queue

* | ...

* v |

* +-------+ +---------+ EnQueue

* +---> | sleep | (No job) | new job |

* | +-------+ +---------+

* | | |

* | | DeQueue +---------+

* | +

* | | +---------+

* | v

* | +---------+

* | | do work |

* | +---------+

* | |

* | |

* +----

**/

工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。

另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。

下面给出完整代码。

1. threadpool.h

1 /*

2 * Linux线程池的简单实现.3 * Author: 赵子清4 * Blog:http://www.cnblogs.com/zzqcn

5 *6 **/

7

8

9

10 #ifndef __THREADPOOL_H__11 #define __THREADPOOL_H__

12

13

14 #include

15 #include

16 #include

17

18

19

20 #define DLPTP_MAX_THREADS 1024

21

22

23 structtp_job_t24 {25 void (*work) (void*);26 void*arg;27 };28

29 structtp_threadpool_t30 {31 pthread_t*threads;32 size_t nthreads;33 std::queuejobs;34 sem_t njobs;35 pthread_mutex_t lock;36 boolrunning;37 };38

39

40 tp_threadpool_t*tp_init(size_t _nthreads);41 int tp_deinit(tp_threadpool_t*_ptp);42 void* tp_worker(void*_ptp);43 int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void*_arg);44

45

46 #endif

47

2. threadpool.cpp

1 /*

2 * Linux线程池的简单实现.3 * Author: 赵子清4 * Blog:http://www.cnblogs.com/zzqcn

5 *6 **/

7

8

9

10 #include "threadpool.h"

11

12

13

14 tp_threadpool_t*tp_init(size_t _nthreads)15 {16 if(_nthreads < 1 || _nthreads >DLPTP_MAX_THREADS)17 returnNULL;18

19 int err = 0;20 tp_threadpool_t* ret =NULL;21 size_t i, j;22

23 ret = newtp_threadpool_t;24 if(NULL ==ret)25 returnNULL;26 ret->nthreads =_nthreads;27 ret->threads = newpthread_t[_nthreads];28 if(NULL == ret->threads)29 {30 delete ret;31 returnNULL;32 }33 ret->running = true;34

35 err = sem_init(&ret->njobs, 0, 0);36 if(-1 ==err)37 {38 delete[] ret->threads;39 delete ret;40 returnNULL;41 }42

43 err = pthread_mutex_init(&ret->lock, NULL);44 if(err)45 {46 sem_destroy(&ret->njobs);47 delete[] ret->threads;48 delete ret;49 returnNULL;50 }51

52 for(i=0; i<_nthreads err="pthread_create(&ret-">threads[i], NULL, tp_worker, (void*)ret);55 if(err)56 {57 ret->running = false;58 for(j=0; jthreads[j]);61 pthread_join(ret->threads[j], NULL);62 }63 pthread_mutex_destroy(&ret->lock);64 sem_destroy(&ret->njobs);65 delete[] ret->threads;66 delete ret;67 returnNULL;68 }69 }70

71 returnret;72 }73

74

75 int tp_deinit(tp_threadpool_t*_ptp)76 {77 if(NULL ==_ptp)78 return -1;79

80 int err = 0;81 size_t i, j;82

83 //TODO: if now worker has job to handle, do something then exit

84 while(!_ptp->jobs.empty());85

86 _ptp->running = false;87

88 for(i=0; i<_ptp->nthreads; ++i)89 {90 err = sem_post(&_ptp->njobs); /*V, ++*/

91 if(err)92 {93 for(j=i; j<_ptp->nthreads; ++j)94 pthread_cancel(_ptp->threads[j]);95 break;96 }97 }98

99 for(i=0; i<_ptp->nthreads; ++i)100 pthread_join(_ptp->threads[i], NULL);101

102 pthread_mutex_destroy(&_ptp->lock);103 sem_destroy(&_ptp->njobs);104

105 delete[] _ptp->threads; _ptp->threads =NULL;106 delete _ptp; _ptp =NULL;107

108 return 0;109 }110

111

112 void* tp_worker(void*_ptp)113 {114 if(NULL ==_ptp)115 returnNULL;116

117 tp_threadpool_t* p = (tp_threadpool_t*)_ptp;118

119 while(p->running)120 {121 sem_wait(&p->njobs); /*P, --*/

122

123 if(!p->running)124 returnNULL;125

126 void (*work) (void*);127 void*arg;128 tp_job_t job;129

130 pthread_mutex_lock(&p->lock); /*LOCK*/

131

132 job = p->jobs.front();133 work =job.work;134 arg =job.arg;135 p->jobs.pop();136

137 pthread_mutex_unlock(&p->lock); /*UNLOCK*/

138

139 work(arg);140 }141

142 returnNULL;143 }144

145

146 int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void*_arg)147 {148 if(NULL == _ptp || NULL ==_work)149 return -1;150

151 tp_job_t job;152 job.work =_work;153 job.arg =_arg;154

155 pthread_mutex_lock(&_ptp->lock); /*LOCK*/

156 _ptp->jobs.push(job);157 sem_post(&_ptp->njobs); /*V, ++*/

158 pthread_mutex_unlock(&_ptp->lock); /*UNLOCK*/

159

160 return 0;161 }

3. 测试程序main.cpp

1 /*

2 * Linux线程池测试.3 * Author: 赵子清4 * Blog:http://www.cnblogs.com/zzqcn

5 *6 **/

7

8 #include

9 #include

10 #include "threadpool.h"

11

12

13 /*task 1*/

14 void task1(void*_arg)15 {16 printf("# Thread working: %u\n", (int)pthread_self());17 printf("Task 1 running..\n");18 usleep(5000);19 }20

21

22 /*task 2*/

23 void task2(void*_arg)24 {25 printf("# Thread working: %u\n", (int)pthread_self());26 printf("Task 2 running..");27 printf("%d\n", *((int*)_arg));28 usleep(5000);29 }30

31

32 #define N_THREADS 4

33

34 int main(int argc, char**argv)35 {36 tp_threadpool_t* ptp =NULL;37 inti;38

39 ptp =tp_init(N_THREADS);40 if(NULL ==ptp)41 {42 fprintf(stderr, "tp_init fail\n");43 return -1;44 }45

46 int a = 32;47 for(i=0; i<10; ++i)48 {49 tp_add_job(ptp, task1, NULL);50 tp_add_job(ptp, task2, (void*)&a);51 }52

53 tp_deinit(ptp);54

55 return 0;56 }