线程同步
同步方式
对于多个线程访问共享资源(临界资源)出现数据混乱的问题,需要进行线程同步。常见的线程同步方式有四种:互斥锁、读写锁、条件变量、信号量。
互斥锁
通过互斥锁可以锁定一个代码块,被锁定的代码块,所有线程只能顺序执行(不能并行)。这样多线程访问共享资源数据混乱的问题就得到解决,需要付出的代价是执行效率降低,因为从并行处理退化到了串行处理。
1 2 3 4 5 6 7 pthread_mutex_t mutex; int pthread_mutex_init (pthread_mutex_t *restrict mutex, pthread_mutex_t *restrict attr) ;int pthread_mutex_destroy (pthread_mutex_t *mutex) ;
参数:
mutex:互斥锁变量的地址
attr:互斥锁的属性,一般使用默认即可,指定为NULL
1 int pthread_mutex_lock (pthread_mutex_t *mutex)
该函数被调用时,首先会判断参数mutex互斥锁中的状态是否为锁定:
如果没有被锁定,这个线程可以加锁成功,锁中会记录是哪个线程加锁成功了。
如果被锁定了,其他线程加锁就失败了,这些线程都会阻塞在这里。
当这把锁被解开之后,这些阻塞在锁上的线程就接触阻塞了,并且这些线程是通过竞争的方式对这把锁加锁,没抢到锁的线程继续阻塞。
1 int pthread_mutex_trylock (pthread_mutex_t *mutex) ;
如果这把锁没有被锁定,线程加锁成功
如果锁被锁定了,调用这个函数加锁的线程不会被阻塞,加锁失败直接返回错误号。
1 2 int pthread_mutex_unlock (pthread_mutex_t *mutex) ;
利用互斥锁进行线程同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <pthread.h> #define MAX 50 int number;pthread_mutex_t mutex;void * funcA_num (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_mutex_lock(&mutex); int cur = number; cur++; usleep(10 ); number = cur; printf ("Thread A, id = %lu, number = %d\n" , pthread_self(), number); pthread_mutex_unlock(&mutex); } return NULL ; } void * funcB_num (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_mutex_lock(&mutex); int cur = number; cur++; number = cur; printf ("Thread B, id = %lu, number = %d\n" , pthread_self(), number); pthread_mutex_unlock(&mutex); usleep(10 ); } return NULL ; } int main (int agrc, const char * argv[]) { pthread_t p1, p2; pthread_mutex_init(&mutex, NULL ); pthread_create(&p1, NULL , funcA_num, NULL ); pthread_create(&p2, NULL , funcB_num, NULL ); pthread_join(p1, NULL ); pthread_join(p2, NULL ); pthread_mutex_destroy(&mutex); return 0 ; }
运行结果,成功完成了100次计数。
死锁
当多个线程访问临界资源时,如果加锁不当,就会造成死锁。死锁造成的后果是,所有线程都被阻塞,并且线程的阻塞时无法解开的。
造成死锁的场景:
加锁后忘记解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void func() { for (int i = 0; i < 6; i ++) { pthread_mutex_lock(&mutex); ... ... // 忘记解锁 } } void func() { for(int i = 0; i < 6; i ++) { pthread_mutex_lock(&mutex); if(...) { return; } pthread_mutex_lock(&mutex); } }
重复加锁,造成死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void func () { for (int i = 0 ; i < 6 ; i ++) { pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex); ... ... pthread_mutex_unlock(&mutex); } } void funcA () { for (int i = 0 ; i < 6 ; i ++) { pthread_mutex_lock(&mutex); ... ... pthread_mutex_unlock(&mutex); } } void funcB () { for (int i = 0 ; i < 6 ; i ++) { pthread_mutex_lock(&mutex); ... funcA(); ... pthread_mutex_unlock(&mutex); } } }
循环
读写锁
读写锁是互斥锁的升级版,在做读操作的时候可以提高程序的执行效率,如果所有的线程都是做读操作,那么读是并行,但是使用互斥锁,读操作也是串行的。
1 pthread_rwlock_t rwlock;
锁中记录的信息:
锁的状态:锁定/打开
锁定的是什么操作:读操作/写操作(锁定了读操作,需要先解锁才能去锁定写操作,反之亦然。
哪个线程进行的加锁
读写锁的特点:
使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,读锁是共享的。
使用读写锁的写锁锁定了临界区,线程对临界区的访问是串行的,写锁是独占的。
写锁比读锁的优先级高。
程序中所有线程都对共享资源有写也有读操作,并且对共享资源读的操作越多,读写锁更有优势。
读写锁函数
1 2 3 4 5 6 7 #include <pthread.h> pthread_rwlock_t rwlock;int pthread_rwlock_init (pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr) ;int pthread_rwlock_destory (pthread_rwlock_t *rwlock) ;
参数:
rwlock:读写锁的地址,传出参数
attr:读写锁属性,一般使用默认属性,指定为NULL
1 2 int pthread_rwlock_rdlock (pthread_rwlock_t *rwlock) ;
调用这个函数时,如果读写锁打开,则加锁成功;如果读写锁已经锁定为读,则仍然可以加锁成功;如果读写锁已经锁定了写操作,则会被阻塞。
1 2 int pthread_rwlock_tryrdlock (pthread_rwlock_t *rwlock) ;
调用这个函数尝试加锁,如果加锁失败返回错误号,且线程不会被阻塞。
1 2 int pthread_rwlock_wrlock (pthread_rwlock_t *rwlock) ;
调用时,如果读写锁打开,那么加锁成功;如果读写锁已经被锁定,则会被阻塞。
读写锁使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <pthread.h> #define MAX 50 int number;pthread_rwlock_t rwlock;void * read_num (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_rwlock_rdlock(&rwlock); printf ("Thread read, id = %lu, number = %d\n" , pthread_self(), number); pthread_rwlock_unlock(&rwlock); usleep(rand()%5 ); } } void * write_num (void * arg) { for (int i = 0 ; i < MAX; i ++) { pthread_rwlock_wrlock(&rwlock); int cur = number; cur ++; number = cur; printf ("Thread write, id = %lu, number = %d\n" , pthread_self(), number); pthread_rwlock_unlock(&rwlock); usleep(5 ); } } int main (int argc, const char * argv[]) { pthread_t p1[5 ], p2[3 ]; pthread_rwlock_init(&rwlock, NULL ); for (int i = 0 ; i < 5 ; i ++) { pthread_create(&p1[i], NULL , read_num, NULL ); } for (int i = 0 ; i < 3 ; i ++) { pthread_create(&p2[i], NULL , write_num, NULL ); } for (int i = 0 ; i < 3 ; i ++) { pthread_join(p2[i],NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_join(p1[i], NULL ); } pthread_rwlock_destroy(&rwlock); }
条件变量
一般用于处理生产者和消费者模型。
被条件变量阻塞的线程的线程信息会被记录到这个变量中,以便在解除阻塞的时候使用。
条件变量函数
1 2 3 4 5 6 7 8 #include <pthread.h> pthread_cond_t cond;int pthread_cond_init (pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr) ;int pthread_cond_destroy (pthread_cond_t *cond) ;
参数:
cond:条件变量的地址
attr:条件变量属性,一般使用默认属性,NULL。
1 2 int pthread_cond_wait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex) ;
1 2 3 4 5 6 7 8 struct timespec { time_t tv_sec; long tv_nsec; } int pthread_cond_timedwait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime) ;
struct timespec 的赋值
1 2 3 4 time_t mytim = time(NULL ); struct timespec tmsp ;tmsp.tv_nsec = 0 ; tmsp.tv_sec = time(NULL ) + 100 ;
1 2 3 4 int pthread_cond_signal (pthread_cond_t *cond) ;int pthread_cond_broadcast (pthread_cond_t *cond) ;
生产者和消费者
使用条件变量实现生产者和消费者模型,生产者有5个,往链表头部添加节点,消费者也有5个,删除链表头部的节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <pthread.h> pthread_cond_t cond;pthread_mutex_t mutex;struct Node { struct Node * next ; int number; }; struct Node * head = NULL ;void * producer (void * arg) { while (1 ) { pthread_mutex_lock(&mutex); struct Node * newNode = (struct Node*)malloc (sizeof (struct Node)); newNode->next = head; newNode->number = rand() % 1000 ; head = newNode; printf ("生产者, id:%ld, number:%d\n" ,pthread_self(),newNode->number); pthread_mutex_unlock(&mutex); pthread_cond_broadcast(&cond); sleep(rand()%3 ); } return NULL ; } void * cosumer (void * arg) { while (1 ) { pthread_mutex_lock(&mutex); while (head == NULL ) { pthread_cond_wait(&cond, &mutex); } struct Node * node = head; printf ("消费者, id:%ld, number:%d\n" , pthread_self(), node->number); head = head->next; free (node); pthread_mutex_unlock(&mutex); sleep(rand()%3 ); } return NULL ; } int main () { pthread_cond_init(&cond, NULL ); pthread_mutex_init(&mutex, NULL ); pthread_t t1[5 ], t2[5 ]; for (int i = 0 ; i < 5 ; i ++) { pthread_create(&t1[i], NULL , producer, NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_create(&t2[i], NULL , cosumer, NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_join(t1[i], NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_join(t2[i], NULL ); } pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); return 0 ; }
信号量
信号量函数
信号量和条件变量一样用于处理生产者和消费者模型,用于阻塞生产者线程或消费者线程的运行。信号量的类型为sem_t对应的头文件为<semaphore.h>
1 2 #include <semaphore.h> sem_t sem;
1 2 3 4 // 初始化信号量 int sem_init(sem_t *sem, int pshared, unsigned int value); //资源释放,线程销毁后调用 int sem_destory(sem_t *sem);
参数
sem:信号量变量地址
pshared:
value:初始化当前信号量拥有的资源数,如果资源数为0,线程就会被阻塞了。
1 2 int sem_wait (sem_t *sem) ;
当线程调用这个函数,并且sem中的资源数>0,线程不会阻塞,线程会占用sem中的一个资源,因此资源数-1,直到资源数减为0,资源被耗尽,因此线程被阻塞。
1 int sem_trywait (sem_t *sem) ;
资源被耗尽时,线程也不会阻塞,函数返回一个错误号。
1 int sem_post (sem_t *sem) ;
使资源数+1。
生产者和消费者
情形一,资源总数为1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <pthread.h> #include <semaphore.h> sem_t semp;sem_t semc;pthread_mutex_t mutex;struct Node { struct Node * next ; int number; }; struct Node * head = NULL ;void * producer (void * arg) { while (1 ) { sem_wait(&semp); struct Node * newNode = (struct Node*)malloc (sizeof (struct Node)); newNode->next = head; newNode->number = rand() % 1000 ; head = newNode; printf ("生产者, id:%ld, number:%d\n" ,pthread_self(),newNode->number); sem_post(&semc); sleep(rand()%3 ); } return NULL ; } void * cosumer (void * arg) { while (1 ) { sem_wait(&semc); struct Node * node = head; printf ("消费者, id:%ld, number:%d\n" , pthread_self(), node->number); head = head->next; free (node); sem_post(&semp); sleep(rand()%3 ); } return NULL ; } int main () { sem_init(&semc,0 ,0 ); sem_init(&semp,0 ,1 ); pthread_mutex_init(&mutex, NULL ); pthread_t t1[5 ], t2[5 ]; for (int i = 0 ; i < 5 ; i ++) { pthread_create(&t1[i], NULL , producer, NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_create(&t2[i], NULL , cosumer, NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_join(t1[i], NULL ); } for (int i = 0 ; i < 5 ; i ++) { pthread_join(t2[i], NULL ); } sem_destroy(&semc); sem_destroy(&semp); pthread_mutex_destroy(&mutex); return 0 ; }
情形2,资源总数大于1时,需要和锁配合使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void * producer (void * arg) { while (1 ) { sem_wait(&semp); pthread_mutex_lock(&mutex); struct Node * newNode = (struct Node*)malloc (sizeof (struct Node)); newNode->next = head; newNode->number = rand() % 1000 ; head = newNode; printf ("生产者, id:%ld, number:%d\n" ,pthread_self(),newNode->number); pthread_mutex_unlock(&mutex); sem_post(&semc); sleep(rand()%3 ); } return NULL ; } void * cosumer (void * arg) { while (1 ) { sem_wait(&semc); pthread_mutex_lock(&mutex); struct Node * node = head; printf ("消费者, id:%ld, number:%d\n" , pthread_self(), node->number); head = head->next; free (node); pthread_mutex_unlock(&mutex); sem_post(&semp); sleep(rand()%3 ); } return NULL ; }