手写线程池
线程池原理
当我们使用线程的时候就去创建一个线程,这样的实现虽然很简便,但是在并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束的时候,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
线程池的作用就是复用线程,当一个线程执行完任务后,并不被销毁,而是可以继续执行其他任务。
线程池的组成:
- 任务队列,存储需要处理的任务,由工作的线程来处理这些任务。
- 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除。
- 已处理的任务会被从任务队列中删除。
- 线程池的使用者,调用线程池函数往任务队列中添加任务的线程(生产者线程)。
- 工作的线程(任务队列任务的消费者),N个
- 线程池中维护了一定数量的工作线程,他们不停的读任务队列,从里边取出任务并处理。
- 工作的线程相当于是任务队列的消费者角色。
- 如果任务队列为空,工作的线程会被阻塞
- 如果阻塞后有了新的任务,由生产者将阻塞解除。
- 管理者线程,1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候,可以适当创建一些新的工作线程。
- 当任务过少的时候,可以适当销毁一些工作线程。
ThreadPool.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| struct ThreadPool { Task* taskQ; int queueCapacity; int queueSize; int queueFront; int queueRear; pthread_t managerID; pthread_t* threadIDs; int minNum; int maxNum; int busyNum; int liveNum; int exitNum; pthread_mutex_t mutexPool; pthread_mutex_t mutexBusy; pthread_cond_t notFull; pthread_cond_t notEmpty;
int shutdown;
};
|
Task
1 2 3 4 5 6
| typedef struct Task { void (*function) (void* arg); void* arg; }Task;
|
threadPoolCreate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| ThreadPool* threadPoolCreate(int min, int max, int queueSize) { ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); do { if(pool == NULL) { printf("malloc threadpool fail...\n"); break; }
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max); if(pool->threadIDs == NULL) { printf("malloc threadIDs fail...\n"); break; } memset(pool->threadIDs, 0, sizeof(pthread_t)*max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min; pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->notEmpty, NULL) != 0 || pthread_cond_init(&pool->notFull, NULL) ) { printf("mutex or condition init fail...\n"); break; }
pool->taskQ = (Task*)malloc(sizeof(Task)*queueSize); pool->queueCapacity = queueSize; pool->queueSize = 0; pool->queueFront = 0; pool->queueRear = 0; pool->shutdown = 0;
pthread_create(&pool->managerID, NULL, manager, pool); for (int i = 0; i < min; i ++) { pthread_create(&pool->threadIDs[i],NULL, worker, pool); } return pool; } while(0);
if (pool && pool->threadIDs) free(pool->threadIDs); if (pool && pool->taskQ) free(pool->taskQ); if (pool) free(pool);
return NULL; }
|
addTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg) { pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == pool->queueCapacity) { pthread_cond_wait(&pool->notFull, &pool->mutexPool); } if (pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return; } pool->taskQ[pool->queueRear].function = func; pool->taskQ[pool->queueRear].arg = arg; pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity; pool->queueSize ++;
pthread_cond_signal(&pool->notEmpty); pthread_mutex_unlock(&pool->mutexPool); }
|
获取BusyNum和liveNum
1 2 3 4 5 6 7 8 9 10 11 12 13
| int threadPoolBusyNum(ThreadPool *pool) { pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); return busyNum; }
int threadPoolAliveNum(ThreadPool *pool) { pthread_mutex_lock(&pool->mutexPool); int aliveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); return aliveNum; }
|
threadPoolDestroy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| int threadPoolDestroy(ThreadPool *pool) { if(pool == NULL) { return -1; } pool->shutdown = 1; pthread_join(pool->managerID, NULL); for (int i = 0; i < pool->liveNum; ++i) { pthread_cond_signal(&pool->notEmpty); } if (pool->taskQ) { free(pool->taskQ); } if(pool->threadIDs) { free(pool->threadIDs); } pthread_mutex_destroy(&pool->mutexPool); pthread_mutex_destroy(&pool->mutexBusy); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull); free(pool); pool = NULL; return 0; }
|
worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| void* worker(void* arg) { ThreadPool* pool = (ThreadPool*) arg;
while(1) { pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == 0 && !pool->shutdown) { pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
if(pool->exitNum > 0) { pool->exitNum--; if(pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); } } } if(pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); }
Task task; task.function = pool->taskQ[pool->queueFront].function; task.arg = pool->taskQ[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; pool->queueSize--;
pthread_cond_signal(&pool->notFull); pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working... \n", pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum ++; pthread_mutex_unlock(&pool->mutexBusy); task.function(task.arg); free(task.arg); task.arg = NULL; printf("thread %ld end working... \n", pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum --; pthread_mutex_unlock(&pool->mutexBusy);
} }
|
manager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| void* manager(void* arg) { ThreadPool* pool = (ThreadPool*) arg; while(!pool->shutdown) { sleep(3); pthread_mutex_lock(&pool->mutexPool); int queueSize = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool);
pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy);
if(queueSize > liveNum && liveNum < pool->maxNum) { pthread_mutex_lock(&pool->mutexPool); int counter = 0; for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i) { if(pool->threadIDs[i] == 0) { pthread_create(&pool->threadIDs[i], NULL, worker, pool); counter ++; pool->liveNum++; } }
pthread_mutex_unlock(&pool->mutexPool); } if (busyNum*2 < liveNum && liveNum > pool->minNum) {
pthread_mutex_lock(&pool->mutexPool); pool->exitNum = NUMBER; pthread_mutex_unlock(&pool->mutexPool); for (int i = 0; i < NUMBER; i ++) { pthread_cond_signal(&pool->notEmpty); } } }
}
|
threadExit
1 2 3 4 5 6 7 8 9 10 11
| void threadExit(ThreadPool* pool) { pthread_t tid = pthread_self(); for (int i = 0; i < pool->maxNum; i ++) { if(pool->threadIDs[i] == tid) { pool->threadIDs[i] = 0; printf("threadExit() called, %ld exiting...\n", tid); break; } } pthread_exit(NULL); }
|
threadpool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #ifndef _THREADPOOL_H #define _THREADPOOL_H
#include <pthread.h> typedef struct ThreadPool ThreadPool;
ThreadPool* threadPoolCreate(int min, int max, int queueSize);
int threadPoolDestroy(ThreadPool* pool);
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
int threadPoolBusyNum(ThreadPool* pool);
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg); void* manager(void* arg); void threadExit(ThreadPool* pool); #endif
|
测试线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <stdio.h> #include <pthread.h> #include <unistd.h> #include <stdlib.h> #include "threadpool.h"
void taskFunc(void* arg) { int num = *(int*)arg; printf("thread is working, number = %d, tid = %ld\n", num, pthread_self()); usleep(1000); }
int main() { ThreadPool* pool = threadPoolCreate(3, 10, 100); for(int i = 0; i < 100; ++i) { int* num = (int*)malloc(sizeof(int)); *num = i + 100; threadPoolAdd(pool, taskFunc, num); }
sleep(30);
threadPoolDestroy(pool); return 0; }
|
编译并运行
运行结果