线程同步

同步方式

对于多个线程访问共享资源(临界资源)出现数据混乱的问题,需要进行线程同步。常见的线程同步方式有四种:互斥锁、读写锁、条件变量、信号量。

互斥锁

通过互斥锁可以锁定一个代码块,被锁定的代码块,所有线程只能顺序执行(不能并行)。这样多线程访问共享资源数据混乱的问题就得到解决,需要付出的代价是执行效率降低,因为从并行处理退化到了串行处理。

1
2
3
4
5
6
7
pthread_mutex_t mutex; //保存了锁的状态信息与线程ID

// 初始化互斥锁
// restrict 只有这个关键字修饰的指针可以访问指向的内存地址,其他指针不行。
int pthread_mutex_init(pthread_mutex_t *restrict mutex, pthread_mutex_t *restrict attr);
// 释放互斥锁资源
int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • 参数:
    • mutex:互斥锁变量的地址
    • attr:互斥锁的属性,一般使用默认即可,指定为NULL
1
int pthread_mutex_lock(pthread_mutex_t *mutex)

该函数被调用时,首先会判断参数mutex互斥锁中的状态是否为锁定:

  • 如果没有被锁定,这个线程可以加锁成功,锁中会记录是哪个线程加锁成功了。
  • 如果被锁定了,其他线程加锁就失败了,这些线程都会阻塞在这里。
  • 当这把锁被解开之后,这些阻塞在锁上的线程就接触阻塞了,并且这些线程是通过竞争的方式对这把锁加锁,没抢到锁的线程继续阻塞。
1
int pthread_mutex_trylock(pthread_mutex_t *mutex);
  • 如果这把锁没有被锁定,线程加锁成功
  • 如果锁被锁定了,调用这个函数加锁的线程不会被阻塞,加锁失败直接返回错误号。
1
2
// 对互斥锁解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);

利用互斥锁进行线程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>

#define MAX 50

int number;
pthread_mutex_t mutex;

void* funcA_num(void* arg) {
for(int i = 0; i < MAX; i ++) {
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
usleep(10);
number = cur;
printf("Thread A, id = %lu, number = %d\n", pthread_self(), number);
pthread_mutex_unlock(&mutex);
}
return NULL;
}

void* funcB_num(void* arg) {
for(int i = 0; i < MAX; i ++) {
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
number = cur;
printf("Thread B, id = %lu, number = %d\n", pthread_self(), number);
pthread_mutex_unlock(&mutex);
usleep(10);
}
return NULL;
}

int main(int agrc, const char* argv[]) {
pthread_t p1, p2;
pthread_mutex_init(&mutex, NULL);
pthread_create(&p1, NULL, funcA_num, NULL);
pthread_create(&p2, NULL, funcB_num, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}

运行结果,成功完成了100次计数。

image-20240315154936287

死锁

当多个线程访问临界资源时,如果加锁不当,就会造成死锁。死锁造成的后果是,所有线程都被阻塞,并且线程的阻塞时无法解开的。

造成死锁的场景:

  • 加锁后忘记解锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void func() {
    for (int i = 0; i < 6; i ++) {
    pthread_mutex_lock(&mutex);
    ...
    ...
    // 忘记解锁
    }
    }

    void func() {
    for(int i = 0; i < 6; i ++) {
    pthread_mutex_lock(&mutex);
    if(...) {
    return;
    }
    pthread_mutex_lock(&mutex);
    }

    }
  • 重复加锁,造成死锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    void func() {
    for(int i = 0; i < 6; i ++) {
    pthread_mutex_lock(&mutex);

    pthread_mutex_lock(&mutex);
    ... ...
    pthread_mutex_unlock(&mutex);
    }
    }

    void funcA() {
    for(int i = 0; i < 6; i ++) {
    pthread_mutex_lock(&mutex);
    ...
    ...
    pthread_mutex_unlock(&mutex);
    }
    }

    void funcB() {
    for(int i = 0; i < 6; i ++) {
    pthread_mutex_lock(&mutex);
    ...
    funcA();
    ...
    pthread_mutex_unlock(&mutex);
    }
    }
    }
  • 循环

读写锁

读写锁是互斥锁的升级版,在做读操作的时候可以提高程序的执行效率,如果所有的线程都是做读操作,那么读是并行,但是使用互斥锁,读操作也是串行的。

1
pthread_rwlock_t rwlock;

锁中记录的信息:

  • 锁的状态:锁定/打开
  • 锁定的是什么操作:读操作/写操作(锁定了读操作,需要先解锁才能去锁定写操作,反之亦然。
  • 哪个线程进行的加锁

读写锁的特点:

  • 使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,读锁是共享的。
  • 使用读写锁的写锁锁定了临界区,线程对临界区的访问是串行的,写锁是独占的。
  • 写锁比读锁的优先级高。

程序中所有线程都对共享资源有写也有读操作,并且对共享资源读的操作越多,读写锁更有优势。

读写锁函数

1
2
3
4
5
6
7
#include <pthread.h>
pthread_rwlock_t rwlock;
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
// 释放读写锁占用的系统资源
int pthread_rwlock_destory(pthread_rwlock_t *rwlock);
  • 参数:
    • rwlock:读写锁的地址,传出参数
    • attr:读写锁属性,一般使用默认属性,指定为NULL
1
2
// 对读写锁加读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

调用这个函数时,如果读写锁打开,则加锁成功;如果读写锁已经锁定为读,则仍然可以加锁成功;如果读写锁已经锁定了写操作,则会被阻塞。

1
2
// 加锁失败时不会被阻塞。
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

调用这个函数尝试加锁,如果加锁失败返回错误号,且线程不会被阻塞。

1
2
// 在程序中对读写锁加写锁,锁定写操作
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

调用时,如果读写锁打开,那么加锁成功;如果读写锁已经被锁定,则会被阻塞。

读写锁使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>

#define MAX 50

int number;

pthread_rwlock_t rwlock;

void* read_num(void* arg) {
for(int i = 0; i < MAX; i ++) {
pthread_rwlock_rdlock(&rwlock);
printf("Thread read, id = %lu, number = %d\n", pthread_self(), number);
pthread_rwlock_unlock(&rwlock);
usleep(rand()%5);
}
}

void* write_num(void* arg) {
for(int i = 0; i < MAX; i ++) {
pthread_rwlock_wrlock(&rwlock);
int cur = number;
cur ++;
number = cur;
printf("Thread write, id = %lu, number = %d\n", pthread_self(), number);
pthread_rwlock_unlock(&rwlock);
usleep(5);
}
}

int main(int argc, const char* argv[]) {
pthread_t p1[5], p2[3];
pthread_rwlock_init(&rwlock, NULL);
for(int i = 0; i < 5; i ++) {
pthread_create(&p1[i], NULL, read_num, NULL);
}
for(int i = 0; i < 3; i ++) {
pthread_create(&p2[i], NULL, write_num, NULL);
}
for(int i = 0; i < 3; i ++) {
pthread_join(p2[i],NULL);
}
for(int i = 0; i < 5; i ++) {
pthread_join(p1[i], NULL);
}
pthread_rwlock_destroy(&rwlock);
}

image-20240317144148391

条件变量

一般用于处理生产者和消费者模型。

1
pthread_cond_t cond;

被条件变量阻塞的线程的线程信息会被记录到这个变量中,以便在解除阻塞的时候使用。

条件变量函数

1
2
3
4
5
6
7
8
#include <pthread.h>
pthread_cond_t cond;
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
// 销毁并释放
int pthread_cond_destroy(pthread_cond_t *cond);

  • 参数:
    • cond:条件变量的地址
    • attr:条件变量属性,一般使用默认属性,NULL。
1
2
// 线程阻塞函数,哪个线程调用这个函数,哪个线程就会被阻塞。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
1
2
3
4
5
6
7
8
// 表示的时间
struct timespec {
time_t tv_sec; // 秒
long tv_nsec; // 纳秒
}
// 将线程阻塞一定的时间长度,时间达到之后,线程就解除阻塞了。
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

struct timespec 的赋值

1
2
3
4
time_t mytim = time(NULL); // 当前总秒数
struct timespec tmsp;
tmsp.tv_nsec = 0;
tmsp.tv_sec = time(NULL) + 100; //线程阻塞100s
1
2
3
4
// 唤醒阻塞在条件变量上的线程,至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒阻塞在条件变量上的线程,被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

生产者和消费者

使用条件变量实现生产者和消费者模型,生产者有5个,往链表头部添加节点,消费者也有5个,删除链表头部的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>

pthread_cond_t cond;
pthread_mutex_t mutex;

struct Node {
struct Node* next;
int number;
};
struct Node* head = NULL;

void* producer(void* arg) {
while(1) {
pthread_mutex_lock(&mutex);
struct Node* newNode = (struct Node*)malloc(sizeof(struct Node));
newNode->next = head;
newNode->number = rand() % 1000;
head = newNode;
printf("生产者, id:%ld, number:%d\n",pthread_self(),newNode->number);
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cond);
sleep(rand()%3);
}
return NULL;
}

void* cosumer(void* arg) {
while(1) {
pthread_mutex_lock(&mutex);
while(head == NULL) {
// 阻塞消费者线程
pthread_cond_wait(&cond, &mutex);
}
struct Node* node = head;
printf("消费者, id:%ld, number:%d\n", pthread_self(), node->number);
head = head->next;
free(node);
pthread_mutex_unlock(&mutex);
sleep(rand()%3);
}
return NULL;
}

int main() {
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);
pthread_t t1[5], t2[5];

for(int i = 0; i < 5; i ++) {
pthread_create(&t1[i], NULL, producer, NULL);
}
for(int i = 0; i < 5; i ++) {
pthread_create(&t2[i], NULL, cosumer, NULL);
}

for(int i = 0; i < 5; i ++) {
pthread_join(t1[i], NULL);
}
for(int i = 0; i < 5; i ++) {
pthread_join(t2[i], NULL);
}

pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}

image-20240318101142349

信号量

信号量函数

信号量和条件变量一样用于处理生产者和消费者模型,用于阻塞生产者线程或消费者线程的运行。信号量的类型为sem_t对应的头文件为<semaphore.h>

1
2
#include <semaphore.h>
sem_t sem;
1
2
3
4
// 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
//资源释放,线程销毁后调用
int sem_destory(sem_t *sem);
  • 参数
    • sem:信号量变量地址
    • pshared:
      • 0:线程同步
      • 非0:进程同步
    • value:初始化当前信号量拥有的资源数,如果资源数为0,线程就会被阻塞了。
1
2
// 函数被调用 sem中的资源就会被消耗一个,资源数-1
int sem_wait(sem_t *sem);

当线程调用这个函数,并且sem中的资源数>0,线程不会阻塞,线程会占用sem中的一个资源,因此资源数-1,直到资源数减为0,资源被耗尽,因此线程被阻塞。

1
int sem_trywait(sem_t *sem);

资源被耗尽时,线程也不会阻塞,函数返回一个错误号。

1
int sem_post(sem_t *sem);

使资源数+1。

生产者和消费者

情形一,资源总数为1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>

sem_t semp;
sem_t semc;
pthread_mutex_t mutex;

struct Node {
struct Node* next;
int number;
};
struct Node* head = NULL;

void* producer(void* arg) {
while(1) {
sem_wait(&semp);
struct Node* newNode = (struct Node*)malloc(sizeof(struct Node));
newNode->next = head;
newNode->number = rand() % 1000;
head = newNode;
printf("生产者, id:%ld, number:%d\n",pthread_self(),newNode->number);
sem_post(&semc);
sleep(rand()%3);
}
return NULL;
}

void* cosumer(void* arg) {
while(1) {
sem_wait(&semc);
struct Node* node = head;
printf("消费者, id:%ld, number:%d\n", pthread_self(), node->number);
head = head->next;
free(node);
sem_post(&semp);
sleep(rand()%3);
}
return NULL;
}

int main() {
sem_init(&semc,0,0);
sem_init(&semp,0,1);
pthread_mutex_init(&mutex, NULL);
pthread_t t1[5], t2[5];

for(int i = 0; i < 5; i ++) {
pthread_create(&t1[i], NULL, producer, NULL);
}
for(int i = 0; i < 5; i ++) {
pthread_create(&t2[i], NULL, cosumer, NULL);
}

for(int i = 0; i < 5; i ++) {
pthread_join(t1[i], NULL);
}
for(int i = 0; i < 5; i ++) {
pthread_join(t2[i], NULL);
}

sem_destroy(&semc);
sem_destroy(&semp);
pthread_mutex_destroy(&mutex);
return 0;
}

image-20240318104240968

情形2,资源总数大于1时,需要和锁配合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void* producer(void* arg) {
while(1) {
sem_wait(&semp);

pthread_mutex_lock(&mutex);
struct Node* newNode = (struct Node*)malloc(sizeof(struct Node));
newNode->next = head;
newNode->number = rand() % 1000;
head = newNode;
printf("生产者, id:%ld, number:%d\n",pthread_self(),newNode->number);
pthread_mutex_unlock(&mutex);

sem_post(&semc);
sleep(rand()%3);
}
return NULL;
}

void* cosumer(void* arg) {
while(1) {
sem_wait(&semc);

pthread_mutex_lock(&mutex);
struct Node* node = head;
printf("消费者, id:%ld, number:%d\n", pthread_self(), node->number);
head = head->next;
free(node);
pthread_mutex_unlock(&mutex);

sem_post(&semp);
sleep(rand()%3);
}
return NULL;
}

image-20240318104822374