在了解了《 同步与互斥的区别 》之后,我们来看看几个经典的线程同步的例子。相信通过具体场景可以让我们学会分析和解决这类线程同步的问题,以便以后应用在实际的项目中。
一组生产者进程和一组消费者进程共享一个初始为空、大小为 n 的缓冲区,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中取出消息,否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。
关系分析:生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,它们也是同步关系。
整理思路:这里比较简单,只有生产者和消费者两个进程,且这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步的PV操作的位置。
信号量设置:信号量 mutex
作为互斥信号量,用于控制互斥访问缓冲池,初值为1;信号量 full
用于记录当前缓冲池中“满”缓冲区数,初值为 0;信号量 empty
用于记录当前缓冲池中“空”缓冲区数,初值为n。
代码示例:(semaphore类的封装见下文)
#include<iostream> #include<unistd.h> // sleep #include<pthread.h> #include"semaphore.h" using namespace std; #define N 5 semaphore mutex("/", 1); // 临界区互斥信号量 semaphore empty("/home", N); // 记录空缓冲区数,初值为N semaphore full("/home/songlee",0); // 记录满缓冲区数,初值为0 int buffer[N]; // 缓冲区,大小为N int i=0; int j=0; void* producer(void* arg) { empty.P(); // empty减1 mutex.P(); buffer[i] = 10 + rand() % 90; printf("Producer %d write Buffer[%d]: %d/n",arg,i+1,buffer[i]); i = (i+1) % N; mutex.V(); full.V(); // full加1 } void* consumer(void* arg) { full.P(); // full减1 mutex.P(); printf(" /033[1;31m"); printf("Consumer %d read Buffer[%d]: %d/n",arg,j+1,buffer[j]); printf("/033[0m"); j = (j+1) % N; mutex.V(); empty.V(); // empty加1 } int main() { pthread_t id[10]; // 开10个生产者线程,10个消费者线程 for(int k=0; k<10; ++k) pthread_create(&id[k], NULL, producer, (void*)(k+1)); for(int k=0; k<10; ++k) pthread_create(&id[k], NULL, consumer, (void*)(k+1)); sleep(1); return 0; }
编译运行输出结果:
Producer 1 write Buffer[1]: 83 Producer 2 write Buffer[2]: 26 Producer 3 write Buffer[3]: 37 Producer 5 write Buffer[4]: 35 Producer 4 write Buffer[5]: 33 Consumer 1 read Buffer[1]: 83 Producer 6 write Buffer[1]: 35 Consumer 2 read Buffer[2]: 26 Consumer 3 read Buffer[3]: 37 Consumer 4 read Buffer[4]: 35 Consumer 5 read Buffer[5]: 33 Consumer 6 read Buffer[1]: 35 Producer 7 write Buffer[2]: 56 Producer 8 write Buffer[3]: 22 Producer 10 write Buffer[4]: 79 Consumer 9 read Buffer[2]: 56 Consumer 10 read Buffer[3]: 22 Producer 9 write Buffer[5]: 11 Consumer 7 read Buffer[4]: 79 Consumer 8 read Buffer[5]: 11
有读者和写者两组并发线程,共享一个文件,当两个或以上的读线程同时访问共享数据时不会产生副作用,但若某个写线程和其他线程(读线程或写线程)同时访问共享数据时则可能导致数据不一致的错误。因此要求:
关系分析:由题目分析可知,读者和写者是互斥的,写者和写者也是互斥的,而读者和读者不存在互斥问题。
整理思路:写者是比较简单的,它与任何线程互斥,用互斥信号量的 PV 操作即可解决。读者的问题比较复杂,它必须实现与写者的互斥,多个读者还可以同时读。所以,在这里用到了一个计数器,用它来判断当前是否有读者读文件。当有读者的时候写者是无法写文件的,此时读者会一直占用文件,当没有读者的时候写者才可以写文件。同时,不同的读者对计数器的访问也应该是互斥的。
信号量设置:首先设置一个计数器 count
,用来记录当前的读者数量,初值为0;设置互斥信号量 mutex
,用于保护更新 count 变量时的互斥;设置互斥信号量 rw
用于保证读者和写者的互斥访问。
#include<iostream> #include<unistd.h> // sleep #include<pthread.h> #include"semaphore.h" using namespace std; int count = 0; // 记录当前的读者数量 semaphore mutex("/",1); // 用于保护更新count变量时的互斥 semaphore rw("/home",1); // 用于保证读者和写者的互斥 void* writer(void* arg) { rw.P(); // 互斥访问共享文件 printf(" Writer %d start writing.../n", arg); sleep(1); printf(" Writer %d finish writing.../n", arg); rw.V(); // 释放共享文件 } void* reader(void* arg) { mutex.P(); // 互斥访问count变量 if(count == 0) // 当第一个读线程读文件时 rw.P(); // 阻止写线程写 ++count; // 读者计数器加1 mutex.V(); // 释放count变量 printf("Reader %d start reading.../n", arg); sleep(1); printf("Reader %d finish reading.../n", arg); mutex.P(); // 互斥访问count变量 --count; // 读者计数器减1 if(count == 0) // 当最后一个读线程读完文件 rw.V(); // 允许写线程写 mutex.V(); // 释放count变量 } int main() { pthread_t id[8]; // 开6个读线程,2个写线程 pthread_create(&id[0], NULL, reader, (void*)1); pthread_create(&id[1], NULL, reader, (void*)2); pthread_create(&id[2], NULL, writer, (void*)1); pthread_create(&id[3], NULL, writer, (void*)2); pthread_create(&id[4], NULL, reader, (void*)3); pthread_create(&id[5], NULL ,reader, (void*)4); sleep(2); pthread_create(&id[6], NULL, reader, (void*)5); pthread_create(&id[7], NULL ,reader, (void*)6); sleep(4); return 0; }
编译运行的结果如下:
Reader 2 start reading... Reader 1 start reading... Reader 3 start reading... Reader 4 start reading... Reader 1 finish reading... Reader 2 finish reading... Reader 3 finish reading... Reader 4 finish reading... Writer 1 start writing... Writer 1 finish writing... Writer 2 start writing... Writer 2 finish writing... Reader 5 start reading... Reader 6 start reading... Reader 5 finish reading... Reader 6 finish reading...
一张圆桌上坐着 5 名哲学家,桌子上每两个哲学家之间摆了 一根 筷子,桌子的中间是一碗米饭,如图所示:
哲学家们倾注毕生精力用于思考和进餐,哲学家在思考时,并不影响他人。只有当哲学家饥饿的时候,才试图拿起左、右两根筷子(一根一根拿起)。如果筷子已在他人手上,则需等待。饥饿的哲学家只有同时拿到了两根筷子才可以开始进餐,当进餐完毕后,放下筷子继续思考。
关系分析:5名哲学家与左右邻居对其中间筷子的访问是互斥关系。
整理思路:显然这里有 5 个线程,那么要如何让一个哲学家拿到左右两个筷子而不造成死锁或饥饿现象?解决方法有两个,一个是让他们同时拿两个筷子;二是对每个哲学家的动作制定规则,避免饥饿或死锁现象的发生。
信号量设置:定义互斥信号量数组 chopstick[5] = {1,1,1,1,1}
用于对 5 根筷子的互斥访问。
semaphore chopstick[5] = {1,1,1,1,1} // 信号量数组 Pi() // i号哲学家的线程 { do { P(chopstick[i]); // 取左边筷子 P(chopstick[(i+1)%5]); // 取右边筷子 eat; // 进餐 V(chopstick[i]); // 放回左边筷子 V(chopstick[(i+1)%5]); // 放回右边筷子 think; // 思考 }while(1); }
上面的伪代码存在一个问题:当五个哲学家都想要进餐,分别拿起他们左边筷子的时候(都恰好执行完 P(chopstick[i])
),筷子已经被拿光了,等到他们再想拿右边的筷子的时候,就全被阻塞了,这就出现了死锁。
为了防止死锁的发生,可以对哲学家线程施加一些限制条件,比如:
这里,我们采用第二种方法来改进上面的算法,即当一个哲学家左右两边的筷子都可用时,才允许他抓起筷子。
#include<iostream> #include<vector> #include<unistd.h> // sleep #include<pthread.h> #include"semaphore.h" using namespace std; vector<semaphore*> chopstick; // 信号量数组 semaphore mutex("/", 1); // 设置取左右筷子的信号量 <-- 关键 void* P1(void* arg) // 第1个哲学家线程 { mutex.P(); // 在取筷子前获得互斥量 chopstick[0]->P(); // 取左边筷子 chopstick[1]->P(); // 取右边筷子 mutex.V(); // 释放取筷子的信号量 printf("Philosopher 1 eat./n"); chopstick[0]->V(); // 放回左边筷子 chopstick[1]->V(); // 放回右边筷子 } void* P2(void* arg) // 第2个哲学家线程 { mutex.P(); // 在取筷子前获得互斥量 chopstick[1]->P(); // 取左边筷子 chopstick[2]->P(); // 取右边筷子 mutex.V(); // 释放取筷子的信号量 printf("Philosopher 2 eat./n"); chopstick[1]->V(); // 放回左边筷子 chopstick[2]->V(); // 放回右边筷子 } void* P3(void* arg) // 第3个哲学家线程 { mutex.P(); // 在取筷子前获得互斥量 chopstick[2]->P(); // 取左边筷子 chopstick[3]->P(); // 取右边筷子 mutex.V(); // 释放取筷子的信号量 printf("Philosopher 3 eat./n"); chopstick[2]->V(); // 放回左边筷子 chopstick[3]->V(); // 放回右边筷子 } void* P4(void* arg) // 第4个哲学家线程 { mutex.P(); // 在取筷子前获得互斥量 chopstick[3]->P(); // 取左边筷子 chopstick[4]->P(); // 取右边筷子 mutex.V(); // 释放取筷子的信号量 printf("Philosopher 4 eat./n"); chopstick[3]->V(); // 放回左边筷子 chopstick[4]->V(); // 放回右边筷子 } void* P5(void* arg) // 第5个哲学家线程 { mutex.P(); // 在取筷子前获得互斥量 chopstick[4]->P(); // 取左边筷子 chopstick[0]->P(); // 取右边筷子 mutex.V(); // 释放取筷子的信号量 printf("Philosopher 5 eat./n"); chopstick[4]->V(); // 放回左边筷子 chopstick[0]->V(); // 放回右边筷子 } int main() { semaphore *sem1 = new semaphore("/home", 1); semaphore *sem2 = new semaphore("/home/songlee", 1); semaphore *sem3 = new semaphore("/home/songlee/java", 1); semaphore *sem4 = new semaphore("/home/songlee/ADT", 1); semaphore *sem5 = new semaphore("/home/songlee/Test", 1); chopstick.push_back(sem1); chopstick.push_back(sem2); chopstick.push_back(sem3); chopstick.push_back(sem4); chopstick.push_back(sem5); pthread_t id; pthread_create(&id, NULL, P1, NULL); pthread_create(&id, NULL, P2, NULL); pthread_create(&id, NULL, P3, NULL); pthread_create(&id, NULL, P4, NULL); pthread_create(&id, NULL, P5, NULL); sleep(1); delete sem1; delete sem2; delete sem3; delete sem4; delete sem5; return 0; }
编译运行的结果如下:
Philosopher 2 eat. Philosopher 1 eat. Philosopher 3 eat. Philosopher 4 eat. Philosopher 5 eat.
注意:创建信号量时的 路径参数 请改成你的系统中存在的路径!!!
上面的代码中都使用了这个 semaphore
类,实现如下:
#pragma once #include<iostream> #include<cstdio> #include<cstdlib> #include<sys/sem.h> using namespace std; // 联合体,用于semctl初始化 union semun { int val; /*for SETVAL*/ struct semid_ds *buf; unsigned short *array; }; class semaphore { private: int sem_id; int init_sem(int); public: semaphore(const char*, int); /*构造函数*/ ~semaphore(); /*析构函数*/ void P(); /*P操作*/ void V(); /*V操作*/ };
#include"semaphore.h" semaphore::semaphore(const char* path, int value) { key_t key; /*获取key值*/ if((key = ftok(path, 'z')) < 0) { perror("ftok error"); exit(1); } /*创建信号量集,其中只有一个信号量*/ if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1) { perror("semget error"); exit(1); } init_sem(value); } semaphore::~semaphore() { union semun tmp; if(semctl(sem_id, 0, IPC_RMID, tmp) == -1) { perror("Delete Semaphore Error"); exit(1); } } void semaphore::P() { struct sembuf sbuf; sbuf.sem_num = 0; /*序号*/ sbuf.sem_op = -1; /*P操作*/ sbuf.sem_flg = SEM_UNDO; if(semop(sem_id, &sbuf, 1) == -1) { perror("P operation Error"); } } void semaphore::V() { struct sembuf sbuf; sbuf.sem_num = 0; /*序号*/ sbuf.sem_op = 1; /*V操作*/ sbuf.sem_flg = SEM_UNDO; if(semop(sem_id, &sbuf, 1) == -1) { perror("V operation Error"); } } // 初始化信号量 int semaphore::init_sem(int value) { union semun tmp; tmp.val = value; if(semctl(sem_id, 0, SETVAL, tmp) == -1) { perror("Init Semaphore Error"); return -1; } return 0; }
在这里,要创建不同的信号量,必须传递不同的路径参数(这样获取的 key 值才会不一样)。
注意,本文的关注点并不在于 linux 下如何创建信号量以及如何封装起来才更方便,而是通过几个经典的同步实例,了解在多线程环境下如何解决这类线程同步问题。