线程池的实现


意义

线程池,字面意思可以理解成这是一个存放一些线程对象的池子,需要用到线程处理任务的时候就从池子中取出一个线程对象,处理完任务后把空闲的线程对象放回池子。

用线程池的意义在于避免了大量线程创建和销毁的时间开销。此外合理地设置线程池并发的线程数与缓存任务的队列,将可能对一个系统的效率、性能实现很大的提升。

我看了其他人大概的实现,然后自己简单实现了一个线程池。这个线程池基于POSIX Threads并使用C语言编写,已push到了github上:

https://github.com/morgorp/thread_pool

下面介绍一些实现的原理。


原理

需要的数据结构有:

  1. 线程池:存放线程对象,由于是基于POSIX Threads实现的,所以就可以表示为一个pthread_t数组。

  2. 任务:需要处理的任务,在C语言中可以用函数指针和参数来表示。

  3. 任务缓存队列:当所有线程都在忙,这时就要将新任务放到缓存队列中等待处理,我采用了循环队列以先来先服务的调度方式实现。

实现的主要操作有:

  1. 初始化线程池(init):给定线程池最大容量和缓存队列最大容量的参数去初始化线程池。

  2. 关闭线程池(shutdown):可以选择立即关闭或者等待还未处理的任务处理完,进入关闭状态后不会接受新的任务。

  3. 执行一个任务(execute):另一个线程池执行一个任务,任务可能不能立即执行则放入队列等待执行。

我参考过Java的线程池,线程池主要几个流程大致如此:

  1. 初始化线程池时并不创建线程对象。

  2. 添加新任务最一开始,如果线程池中没有空闲线程且线程对象未满,则创建一个线程对象。

  3. 添加新任务就把任务插入到缓存队列中,然后唤醒空闲的线程。

  4. 被唤醒的线程会从队列中取出任务然后执行。

  5. 每个线程对象做的事情就是不断地从队列取出任务执行,如果队列为空则休眠等待唤醒。

一些细节可以参见下面的代码实现。


代码

thread_pool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Copyright (C), 2017, progrom.
* File name: thread_pool.c
* Author: progrom
* Version: 0.01
* Date: 20170804
* Description: 线程池头文件
*/
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <pthread.h>
/* 线程池结构体 */
typedef struct thread_pool_t {
pthread_t *pool_thread; /* 线程数组 */
int pool_size; /* 线程池内线程最大数目(数组大小) */
int thread_num; /* 当前线程数 */
int running_cnt; /* 正在执行任务的线程数 */
struct thread_task_t *work_queue; /* 缓存未被处理任务的循环队列 */
int queue_size; /* 队列大小 */
volatile int queue_front; /* 队列头指针 */
volatile int queue_rear; /* 队列尾指针 */
volatile int status; /* 线程池状态:0 关闭; 1 开启*/
pthread_mutex_t lock; /* 保护线程池临界资源的锁 */
pthread_mutex_t newtasklock; /* 是否允许添加新任务,作用于令旧任务执行优先级大于新任务添加 */
pthread_cond_t wakeup; /* 令线程休眠等待任务或唤醒执行任务的条件变量 */
} thread_pool_t;
/* 任务结构体 */
typedef struct thread_task_t {
void (*func)(void *); /* 任务所要执行的函数 */
void *argv; /* 函数参数 */
} thread_task_t;
void thread_pool_init(thread_pool_t *, int, int);
void thread_task_init(thread_task_t *, void (*)(void *), void *);
void thread_pool_shutdown(thread_pool_t *, int);
int thread_pool_execute(thread_pool_t *, thread_task_t *);
static void *thread_execute(void *);
#endif

thread_pool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/**
* Copyright (C), 2017, progrom.
* File name: thread_pool.c
* Author: progrom
* Version: 0.01
* Date: 20170804
* Description: 线程池
*/
#include "thread_pool.h"
#include <stdlib.h>
#include <unistd.h>
/**
* Function: thread_pool_init
* Description: 初始化一个线程池
* Input:
* tp_p 所要初始化的线程池
* pool_size 线程池能容纳的最大线程数
* queue_size 任务队列能缓存的最大任务数
* Ouput:
* Return:
*/
void thread_pool_init
(thread_pool_t *tp_p, int pool_size, int queue_size)
{
if(tp_p==NULL || pool_size<1 || queue_size<1) {
return;
}
tp_p->pool_thread = malloc(sizeof(pthread_t) * pool_size);
tp_p->pool_size = pool_size;
tp_p->thread_num = 0;
tp_p->running_cnt = 0;
/* 因为循环队列采用的实现是预留一个空位以区分队满和队空状态 */
/* 故 queue_size + 1 */
tp_p->work_queue = malloc(sizeof(thread_task_t) * (queue_size+1));
tp_p->queue_size = queue_size+1;
tp_p->queue_front = 0;
tp_p->queue_rear = 0;
tp_p->status = 1;
pthread_mutex_init(&tp_p->lock, NULL);
pthread_mutex_init(&tp_p->newtasklock, NULL);
pthread_cond_init(&tp_p->wakeup, NULL);
}
/**
* Function: thread_task_init
* Description: 初始化一个任务
* Input:
* tt_p 所要初始化的任务
* func 任务所要执行的函数
* arg 函数的参数
* Ouput:
* Return:
*/
void thread_task_init
(thread_task_t *tt_p, void (*func)(void *), void *argv)
{
if(tt_p==NULL || func==NULL) {
return;
}
tt_p->func = func;
tt_p->argv = argv;
}
/**
* Function: thread_pool_shutdown
* Description:
* 关闭一个线程池
* 处于关闭状态的线程池不再接受任务
* 如果选择不立即关闭线程池(now=0)则会等待直到所有任务(包括队列里的任务)完成
* Input:
* tp_p 所要关闭的线程池
* now 是否立即关闭:0 否; 非0 是
* Ouput:
* Return:
*/
void thread_pool_shutdown(thread_pool_t *tp_p, int now)
{
if(tp_p == NULL) {
return;
}
/* 设置线程池状态为关闭 */
pthread_mutex_lock(&tp_p->lock);
tp_p->status = 0;
pthread_mutex_unlock(&tp_p->lock);
/* 立即关闭,取消所有线程 */
if(now != 0) {
for(int i=0; i<tp_p->thread_num; ++i) {
pthread_cancel(tp_p->pool_thread[i]);
}
}
/* 等待所有线程结束 */
for(int i=0; i<tp_p->thread_num; ++i) {
pthread_join(tp_p->pool_thread[i], NULL);
}
/* 释放动态分配的内存 */
free(tp_p->pool_thread);
free(tp_p->work_queue);
/* 销毁锁 */
pthread_cond_destroy(&tp_p->wakeup);
pthread_mutex_destroy(&tp_p->newtasklock);
pthread_mutex_destroy(&tp_p->lock);
}
/**
* Function: thread_pool_execute
* Description: 令一个线程池执行一个任务
* Input:
* tp_p 所要执行任务的线程池
* tt_p 所要执行的任务
* Ouput:
* Return:
* 0 成功
* -1 失败
*/
int thread_pool_execute(thread_pool_t *tp_p, thread_task_t *tt_p)
{
if(tp_p == NULL || tt_p==NULL) {
return -1;
}
/* 先锁lock再锁newtasklock */
pthread_mutex_lock(&tp_p->lock);
pthread_mutex_lock(&tp_p->newtasklock); // 如果还不允许添加新任务则阻塞
pthread_mutex_unlock(&tp_p->newtasklock);
/* 线程池已处于关闭状态 */
if(tp_p->status <= 0) {
pthread_mutex_unlock(&tp_p->lock);
return -1;
}
/* 如果所有已创建的线程都在忙碌且线程池还未满则创建一个新的线程 */
if(tp_p->running_cnt>=tp_p->thread_num && tp_p->thread_num<tp_p->pool_size) {
int thread_num = tp_p->thread_num++;
pthread_mutex_unlock(&tp_p->lock);
pthread_create(&tp_p->pool_thread[thread_num], NULL, thread_execute, tp_p);
pthread_mutex_lock(&tp_p->lock);
}
/* 任务入队 */
int nrear = tp_p->queue_rear+1; // nrear = 尾指针后移一格
if(nrear >= tp_p->queue_size) nrear = 0;
if(nrear == tp_p->queue_front) { // 队列已满
pthread_mutex_unlock(&tp_p->lock);
return -1;
}
tp_p->work_queue[tp_p->queue_rear] = *tt_p; // 入队
tp_p->queue_rear = nrear;
/* 存在空闲的睡眠线程,唤醒线程去执行任务 */
if(tp_p->running_cnt < tp_p->thread_num) {
/* 轮询直到任务真正被唤醒的线程从队列取出,这儿用轮询因为个人觉得等待时间比较短没必要用条件变量增加开销 */
pthread_mutex_lock(&tp_p->newtasklock); // 还未确定任务是否被新线程从队列取出,暂时不允许添加新任务
pthread_mutex_unlock(&tp_p->lock);
pthread_cond_signal(&tp_p->wakeup); // 唤醒线程
/* 读取这三个临界资源不必用互斥量保护不影响结果的正确性,此外三个变量均声明为volatile */
while(tp_p->queue_front!=tp_p->queue_rear && tp_p->status!=0) {} // 轮询直到线程池关闭或任务被取出
pthread_mutex_unlock(&tp_p->newtasklock); // 允许添加新任务了
} else {
pthread_mutex_unlock(&tp_p->lock);
}
return 0;
}
/**
* Function: thread_execute
* Description:
* static函数,仅在本文件可见
* 启动pthread线程所调用的函数
* 主要就是不停地从队列中取出任务执行
* Input:
* arg 线程所属的线程池
* Ouput:
* Return: NULL
*/
static void *thread_execute(void *arg)
{
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); // 设置线程为异步取消
thread_pool_t *tp_p = arg;
thread_task_t task;
for(;;) {
pthread_mutex_lock(&tp_p->lock);
/* 线程池已关闭且队列中没有未执行的任务 */
if(tp_p->status<=0 && tp_p->queue_rear==tp_p->queue_front) {
pthread_mutex_unlock(&tp_p->lock);
break;
}
/* 等待队列任务的到来 */
while(tp_p->queue_rear == tp_p->queue_front) {
pthread_cond_wait(&tp_p->wakeup, &tp_p->lock);
/* 线程池处于关闭状态且队列没有未执行任务,准备终止线程 */
if(tp_p->status<=0 && tp_p->queue_rear==tp_p->queue_front) {
pthread_mutex_unlock(&tp_p->lock);
goto EXIT;
}
}
/* 从队首取出任务 */
task = tp_p->work_queue[tp_p->queue_front++];
if(tp_p->queue_front >= tp_p->queue_size) tp_p->queue_front = 0;
/* 执行任务 */
++tp_p->running_cnt; // 正在执行任务的线程数+1
pthread_mutex_unlock(&tp_p->lock);
task.func(task.argv); // 调用执行任务的函数
pthread_mutex_lock(&tp_p->lock);
--tp_p->running_cnt; // 正在执行任务的线程数-1
pthread_mutex_unlock(&tp_p->lock);
}
EXIT: // 准备终止线程
return NULL;
}


问题

性能方面的测试不清楚怎么测,要在不同的具体使用线程池场景,和不同的CPU核心数测试吧。。

实现起来感觉有些细节问题还是有些不好把握的,比如感觉锁的粒度有点大,只有一个lock锁来保护所有临界资源,可以考虑一个临界资源对应一个锁?不过粒度太小,频繁加锁解锁开销肯定是有的,这个可能需要实际测试性能来权衡吧。

这儿我想提的是测试时发现的一个问题,感觉还是挺有意义的。


问题的出现与分析

execute()里一开始我的实现是把任务放入队列,然后调用pthread_cond_signal()就完事了。可是用如下测试程序测试时出现了问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Copyright (C), 2017, progrom.
* File name: test.c
* Author: progrom
* Version: 0.01
* Date: 20170806
* Description: 简单地测试一下线程池
*/
#include "thread_pool.h"
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
/* the arg of a task */
typedef struct task_arg {
int num; /* the number of the task */
int sec; /* working time of the task */
} task_arg;
void func(void *arg)
{
task_arg *ta = arg;
printf("\tI am task #%d and I'm going to sleep for %d second.\n", ta->num, ta->sec);
sleep(ta->sec); // working...
printf("\tI am task #%d and I have finished.\n", ta->num);
}
int main()
{
thread_pool_t pool;
puts("init a thread pool with pool size of 3 and queue size of 2.\n");
thread_pool_init(&pool, 3, 2);
thread_task_t task;
task_arg args[5];
for(int i=0; i<5; ++i) {
args[i].num = i;
args[i].sec = 2;
thread_task_init(&task, func, &args[i]);
printf("execute #%d task\n", i);
int ret = thread_pool_execute(&pool, &task);
printf("success: %d\n", ret);
printf("pool.thread_num: %d\n", pool.thread_num);
printf("pool.running_cnt: %d\n", pool.running_cnt);
printf("pool.queue_front: %d\n", pool.queue_front);
printf("pool.queue_rear: %d\n", pool.queue_rear);
putchar('\n');
}
thread_pool_shutdown(&pool, 0); // wait until all tasks are finished
return 0;
}

测试结果是: 不合理的结果

这个测试相当于一种任务很密集的场景,线程池接受一个任务后立马又接受下一个任务,由于互斥量,线程对象迟迟不能从队列中取出任务执行。

所以看到的结果是:

  • 线程池从始至终只创建了一个一直很空闲的线程
  • 任务们却一直阻塞迟迟不能得到执行
  • 直到任务缓存队列被塞满了,后面任务全部都被丢弃了
  • 终于不在有新任务出现CPU空闲了,这时那个唯一的线程才取出队列中的任务,一个接着一个单线程地执行完

问题的解决方法

我想了想这个问题的解决方法:可以令pthread_cond_signal()调用后确保唤醒的线程一定会从队列中取出任务而不被新任务的添加打断。

为了实现这个效果,我加入了一个互斥量newtasklock来阻止新任务的添加,并且用一个循环轮询直到队列的任务被取出。见代码段:

1
2
3
4
5
6
7
8
9
10
11
/* 存在空闲的睡眠线程,唤醒线程去执行任务 */
if(tp_p->running_cnt < tp_p->thread_num) {
pthread_cond_signal(&tp_p->wakeup); // 唤醒线程
/* 轮询直到任务真正被唤醒的线程从队列取出,这儿用轮询因为个人觉得等待时间比较短没必要用条件变量增加开销 */
pthread_mutex_lock(&tp_p->newtasklock); // 还未确定任务是否被新线程从队列取出,暂时不允许添加新任务
/* 读取这三个临界资源不必用互斥量保护不影响结果的正确性,此外三个变量均声明为volatile */
while(tp_p->queue_front!=tp_p->queue_rear && tp_p->status!=0) {} // 轮询直到线程池关闭或任务被取出
pthread_mutex_unlock(&tp_p->newtasklock); // 允许添加新任务了
}

while轮询

这儿用while循环轮询,上面注释也有说到原因:

  1. 结果正确性不影响;
  2. 唤醒线程取任务出来是个很快的过程,没必要加个条件变量去通知。这就如同明明是一件很快就会发生的事情,完全没必要去通知另一个人告诉它这件事发生时告诉我,通知另一个人的时候说不定事情就发生了,所以自己静静等待即可。

我认为应该挺科学的。。

volatile

此外这三个变量声明为volatile(如下代码段),这样就不怕编译器的优化导致死循环,虽然可能不会出现编译器优化的情况,不过稳点最好。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 线程池结构体 */
typedef struct thread_pool_t {
/*。。。省略。。。*/
volatile int queue_front; /* 队列头指针 */
volatile int queue_rear; /* 队列尾指针 */
volatile int status; /* 线程池状态:0 关闭; 1 开启*/
/*。。。省略。。。*/
} thread_pool_t;

加锁顺序

还有一点我想说到的是加锁的顺序,众所周知规定各个互斥量的加锁顺序可以预防死锁,而此外我发现加锁的顺序还是有很多微妙的地方。

thread_pool_execute()函数里,如果是先锁newtasklock再锁lock。那么在唤醒线程执行任务那儿,锁newtasklock前必须先释放已经锁上的lock,不然会产生死锁。而在释放lock后,另一个线程可能立刻调用thread_pool_execute()获得两把锁,打断了原子性。

而如果加锁顺序反过来则不会,这东西不好描述。。我觉得先锁lock再锁newtasklock这一手有点妙。这两把锁加锁的模型大概如下:

1
2
3
4
5
6
7
/* 先lock1,后lock2 */
lock1
lock2
unlock1
unlock2
// <-空隙
lock1

1
2
3
4
5
6
/* 先lock2,后lock1 */
lock2
lock1
unlock1
lock1
unlock2

总之,我觉得加锁顺序是个很微妙的东西,需要仔细考虑。


问题的解决

于是,最终的结果终于合理了: 合理的结果