手写线程池

线程池原理

当我们使用线程的时候就去创建一个线程,这样的实现虽然很简便,但是在并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束的时候,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池的作用就是复用线程,当一个线程执行完任务后,并不被销毁,而是可以继续执行其他任务。

线程池的组成:

  1. 任务队列,存储需要处理的任务,由工作的线程来处理这些任务。
    • 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除。
    • 已处理的任务会被从任务队列中删除。
    • 线程池的使用者,调用线程池函数往任务队列中添加任务的线程(生产者线程)。
  2. 工作的线程(任务队列任务的消费者),N个
    • 线程池中维护了一定数量的工作线程,他们不停的读任务队列,从里边取出任务并处理。
    • 工作的线程相当于是任务队列的消费者角色。
    • 如果任务队列为空,工作的线程会被阻塞
    • 如果阻塞后有了新的任务,由生产者将阻塞解除。
  3. 管理者线程,1个
    • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
      • 当任务过多的时候,可以适当创建一些新的工作线程。
      • 当任务过少的时候,可以适当销毁一些工作线程。

ThreadPool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity;
int queueSize;
int queueFront;
int queueRear;

pthread_t managerID;
pthread_t* threadIDs; // 线程ID数组,用来存储工作的线程ID
int minNum; // 最小线程数
int maxNum; // 最大线程数
int busyNum; // 忙的线程数
int liveNum; // 存活的线程个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 线程池锁
pthread_mutex_t mutexBusy; // busyNum变量锁
pthread_cond_t notFull; // 任务队列是否满了
pthread_cond_t notEmpty; // 任务队列是否为空

int shutdown; // 是否要销毁整个线程池,销毁为1,不销毁为0

};

Task

1
2
3
4
5
6
// 任务,包含两个成员,分别是线程需要执行的函数,已经函数携带的参数列表。
typedef struct Task
{
void (*function) (void* arg);
void* arg;
}Task;

threadPoolCreate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do {
if(pool == NULL) {
printf("malloc threadpool fail...\n");
break;
}

pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadIDs == NULL) {
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t)*max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min;
pool->exitNum = 0;

if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL)
) {
printf("mutex or condition init fail...\n");
break;
}

pool->taskQ = (Task*)malloc(sizeof(Task)*queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;

pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; i ++) {
pthread_create(&pool->threadIDs[i],NULL, worker, pool);
}

return pool;
} while(0);

if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);

return NULL;
}

addTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg) {
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity) {
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize ++;

pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}

获取BusyNum和liveNum

1
2
3
4
5
6
7
8
9
10
11
12
13
int threadPoolBusyNum(ThreadPool *pool) {
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}

int threadPoolAliveNum(ThreadPool *pool) {
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}

threadPoolDestroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int threadPoolDestroy(ThreadPool *pool) {
if(pool == NULL) {
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 收回管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者
for (int i = 0; i < pool->liveNum; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ) {
free(pool->taskQ);
}
if(pool->threadIDs) {
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}

worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*) arg;

while(1) {
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == 0 && !pool->shutdown) {
// 阻塞
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

// 判断是否要销毁线程
if(pool->exitNum > 0) {
pool->exitNum--;
if(pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}

if(pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}

Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;

// 唤醒生产者
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);

printf("thread %ld start working... \n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum ++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working... \n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum --;
pthread_mutex_unlock(&pool->mutexBusy);

}
}

manager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void* manager(void* arg) {
ThreadPool* pool = (ThreadPool*) arg;
while(!pool->shutdown) {
// 每三秒检测一次
sleep(3);
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);

pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);

// 添加线程
// 任务个数>存活的线程个数 && 存活线程数<最大线程数
if(queueSize > liveNum && liveNum < pool->maxNum) {
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i) {
if(pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter ++;
pool->liveNum++;

}
}

pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数
if (busyNum*2 < liveNum && liveNum > pool->minNum) {

pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);

// 唤醒阻塞的工作线程,让其自杀
for (int i = 0; i < NUMBER; i ++) {
pthread_cond_signal(&pool->notEmpty);
}
}
}

}

threadExit

1
2
3
4
5
6
7
8
9
10
11
void threadExit(ThreadPool* pool) {
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; i ++) {
if(pool->threadIDs[i] == tid) {
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <pthread.h>
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif

测试线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include "threadpool.h"

void taskFunc(void* arg) {
int num = *(int*)arg;
printf("thread is working, number = %d, tid = %ld\n", num, pthread_self());
usleep(1000);
}

int main() {
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i = 0; i < 100; ++i) {
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}

sleep(30);

threadPoolDestroy(pool);
return 0;
}

编译并运行

image-20240327130325254

运行结果

image-20240327130241273