信号量
信号量其实是一个整型的计数器,主要用于实现进程间的互斥与同步,而不是用于缓存进程间通信的数据。
信号量表示资源的数量
控制信号量的方式有两种原子操作
- 一个是 P 操作,这个操作会把信号量减去 -1,相减后如果信号量 < 0,则表明资源已被占用,进程需阻塞等待;相减后如果信号量 >= 0,则表明还有资源可使用,进程可正常继续执行。
- 另一个是 V 操作,这个操作会把信号量加上 1,相加后如果信号量 <= 0,则表明当前有阻塞中的进程,于是会将该进程唤醒运行;相加后如果信号量 > 0,则表明当前没有阻塞中的进程;
P 操作是用在进入共享资源之前,V 操作是用在离开共享资源之后,这两个操作是必须成对出现的。
接下来,举个例子,如果要使得两个进程互斥访问共享内存,我们可以初始化信号量为 1。
具体过程
- 进程 A 在访问共享内存前,先执行了 P 操作,由于信号量的初始值为 1,故在进程 A 执行 P 操作后信号量变为 0,表示共享资源可用,于是进程 A 就可以访问共享内存。
- 若此时,进程 B 也想访问共享内存,执行了 P 操作,结果信号量变为了 -1,这就意味着临界资源已被占用,因此进程 B 被阻塞。
- 直到进程 A 访问完共享内存,才会执行 V 操作,使得信号量恢复为 0,接着就会唤醒阻塞中的线程 B,使得进程 B 可以访问共享内存,最后完成共享内存的访问后,执行 V 操作,使信号量恢复到初始值 1。
可以发现,信号初始化为 1,就代表着是互斥信号量,它可以保证共享内存在任何时刻只有一个进程在访问,这就很好的保护了共享内存。
另外,在多进程里,每个进程并不一定是顺序执行的,它们基本是以各自独立的、不可预知的速度向前推进,但有时候我们又希望多个进程能密切合作,以实现一个共同的任务。
例如,进程 A 是负责生产数据,而进程 B 是负责读取数据,这两个进程是相互合作、相互依赖的,进程 A 必须先生产了数据,进程 B 才能读取到数据,所以执行是有前后顺序的。
那么这时候,就可以用信号量来实现多进程同步的方式,我们可以初始化信号量为 0。
具体过程
- 如果进程 B 比进程 A 先执行了,那么执行到 P 操作时,由于信号量初始值为 0,故信号量会变为 -1,表示进程 A 还没生产数据,于是进程 B 就阻塞等待;
- 接着,当进程 A 生产完数据后,执行了 V 操作,就会使得信号量变为 0,于是就会唤醒阻塞在 P 操作的进程 B;
- 最后,进程 B 被唤醒后,意味着进程 A 已经生产了数据,于是进程 B 就可以正常读取数据了。
可以发现,信号初始化为 0,就代表着是同步信号量,它可以保证进程 A 应在进程 B 之前执行。
一些同步模型
消费者与生产者模型
模型解释
所谓生产者与消费者模型,就是指有一些生产者,每次会生产一些产品(数据)到容器里面(储存数据的数据结构),一群消费者对容器里的产品进行消费(对数据进行处理),如图
代码实现
而为了实现线程的临界区的数据安全,所以可以用mutex(互斥锁)来保证数据的安全
消费者与生产者模型简单代码(mutex实现)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
#include <iostream> using namespace std; #include <pthread.h> #include <unistd.h>
class Node { public: Node(int id) { this->id=id; next=NULL; } int id; Node *next; };
pthread_mutex_t mutex1;
int i=1; Node *head=NULL;
void *Produce(void *) { while(1) { pthread_mutex_lock(&mutex1); Node *newnode=new Node(i); if(head==NULL) { head=newnode; } else { newnode->next=head; head=newnode; } cout<<pthread_self()<<" "<<"已添加数据为:"<<i<<"的产品!"<<endl; i++; pthread_mutex_unlock(&mutex1); sleep(1); } return NULL; }
void *Consume(void *) {
while(1) { pthread_mutex_lock(&mutex1); if(head==NULL) { break; } Node *p=head; while(p!=NULL&&p->next!=NULL) { p=p->next; } Node *p1=head; while(p1!=NULL&&p1->next!=p) { p1=p1->next; } if(p==head) { cout<<pthread_self()<<" "<<"已删除数据为:"<<p->id<<"的产品!"<<endl; head=NULL; } else { cout<<pthread_self()<<" "<<"已删除数据为:"<<p->id<<"的产品!"<<endl; p1->next=NULL; } pthread_mutex_unlock(&mutex1); sleep(1); } pthread_mutex_unlock(&mutex1); return NULL; } int main() { pthread_mutex_init(&mutex1,NULL);
pthread_t produce[5],consume[5]; for(int i=0;i<5;i++) { pthread_create(&produce[i],NULL,Produce,NULL); pthread_create(&consume[i],NULL,Consume,NULL); }
for(int i=0;i<5;i++) { pthread_detach(produce[i]); pthread_detach(consume[i]); }
while(1) { sleep(10); }
pthread_mutex_destroy(&mutex1);
pthread_exit(NULL); return 0; }
|
还有一种方法就是使用信号量来实现资源的互斥与同步啦,在上面的代码中,我们是利用判断链表是否为空来判断有无产品可以让消费者消费,但是在信号量中,我们就可以省略这一步,直接依靠信号量来实现数据的同步
消费者与生产者模型简单代码(sem实现)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
#include <iostream> using namespace std; #include <pthread.h> #include <unistd.h>
class Node { public: Node(int id) { this->id=id; next=NULL; } int id; Node *next; };
sem_t *sem; sem_t *produce; sem_t *consume;
int i=1; Node *head=NULL;
void *Produce(void *) { while(1) { sem_wait(produce); sem_wait(sem); Node *newnode=new Node(i); if(head==NULL) { head=newnode; } else { newnode->next=head; head=newnode; } cout<<pthread_self()<<" "<<"已添加数据为:"<<i<<"的产品!"<<endl; i++; sleep(1); sem_post(sem); sem_post(consume); } return NULL; }
void *Consume(void *) {
while(1) { sem_wait(consume); sem_wait(sem); Node *p=head; while(p!=NULL&&p->next!=NULL) { p=p->next; } Node *p1=head; while(p1!=NULL&&p1->next!=p) { p1=p1->next; } if(p==head) { cout<<pthread_self()<<" "<<"已删除数据为:"<<p->id<<"的产品!"<<endl; head=NULL; } else { cout<<pthread_self()<<" "<<"已删除数据为:"<<p->id<<"的产品!"<<endl; p1->next=NULL; } sem_post(sem); sem_post(produce); sleep(1); } return NULL; } int main() { sem_init(sem,0,1); sem_init(produce,0,5); sem_init(consume,0,5);
pthread_t produce[5],consume[5]; for(int i=0;i<5;i++) { pthread_create(&produce[i],NULL,Produce,NULL); pthread_create(&consume[i],NULL,Consume,NULL); }
for(int i=0;i<5;i++) { pthread_detach(produce[i]); pthread_detach(consume[i]); }
while(1) { sleep(10); }
sem_destroy(sem); sem_destroy(produce); sem_destroy(consume);
pthread_exit(NULL); return 0; }
|
哲学家进餐模型
模型解释
先来看看哲学家就餐的问题描述:
5
个老大哥哲学家,闲着没事做,围绕着一张圆桌吃面;
- 巧就巧在,这个桌子只有 5 支叉子,每两个哲学家之间放一支叉子;
- 哲学家围在一起先思考,思考中途饿了就会想进餐;
- 奇葩的是,这些哲学家要两支叉子才愿意吃面,也就是需要拿到左右两边的叉子才进餐;
- 吃完后,会把两支叉子放回原处,继续思考;
那么问题来了,如何保证哲学家们的动作有序进行,而不会出现有人永远拿不到叉子呢?
方法一
可以用锁来实现,但因为有很多勺子,所以在此基础上修改一下,改用信号量来实现
1 2 3 4 5 6 7 8 9 10 11 12
| #define N 5 sem_t fork[5]
void smart_person(int i) { think(); sem_wait(fork[i]); sem_wait(fork[(i+1)%N]) eat(); sem_post(fork[i]); sem_post(fork[(i+1)%N]) }
|
但这就会出现一种极端情况,那就是如果每个人都拿了左边的勺子,那么此时就会出现死锁,谁也无法走到第二步(也就是去拿右边的勺子,因为此时已经被他左边的人拿了)
方法二
为了解决方法一出现的问题,我们可以当只要有一个哲学家思考结束准备开始吃饭的时候就对所有勺子上锁(也可以是信号量),代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| #define N 5 sem_t *fork[5]; sem_t *sem; void smart_person(int i) { think(); sem_wait(sem) sem_wait(fork[i]); sem_wait(fork[(i+1)%N]) eat(); sem_post(fork[i]); sem_post(fork[(i+1)%N]) sem_post(sem) }
|
这样是不会出现死锁的情况了,但是这样就会导致一次只能有一个哲学家进餐,但实际上我们知道,同一时间可以有两个哲学家进餐的
方法三
那既然方案二使用互斥信号量,会导致只能允许一个哲学家就餐,那么我们就不用它。
另外,方案一的问题在于,会出现所有哲学家同时拿左边刀叉的可能性,那我们就避免哲学家可以同时拿左边的刀叉,采用分支结构,根据哲学家的编号的不同,而采取不同的动作。
即让偶数编号的哲学家「先拿左边的叉子后拿右边的叉子」,奇数编号的哲学家「先拿右边的叉子后拿左边的叉子」
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #define N 5 sem_t fork[5]
void smart_person(int i) { think(); if(i%2==0) { sem_wait(fork[i]); sem_wait(fork[(i+1)%N]) } else { sem_wait(fork[(i+1)%N]) sem_wait(fork[i]); } eat(); sem_post(fork[i]); sem_post(fork[(i+1)%N]) }
|
上面的程序,在 P 操作时,根据哲学家的编号不同,拿起左右两边叉子的顺序不同。另外,V 操作是不需要分支的,因为 V 操作是不会阻塞的
方案四
在这里再提出另外一种可行的解决方案,我们用一个数组state来记录每一位哲学家在进程、思考还是饥饿状态(正在试图拿叉子)
那么,一个哲学家只有在两个邻居都没有进餐时,才可以进入进餐状态
第 i 个哲学家的左邻右舍,则由宏 LEFT 和 RIGHT 定义:
LEFT : ( i + 5 - 1 ) % 5
RIGHT : ( i + 1 ) % 5
比如 i 为 2,则 LEFT 为 1,RIGHT 为 3。
这里就直接贴一张图片吧
上面的程序使用了一个信号量数组,每个信号量对应一位哲学家,这样在所需的叉子被占用时,想进餐的哲学家就被阻塞
读者-写者模型
模型解释
前面的「哲学家进餐问题」对于互斥访问有限的竞争问题(如 I/O 设备)一类的建模过程十分有用。
另外,还有个著名的问题是「读者-写者」,它为数据库访问建立了一个模型。
读者只会读取数据,不会修改数据,而写者即可以读也可以修改数据。
读者-写者的问题描述:
「读-读」允许:同一时刻,允许多个读者同时读
「读-写」互斥:没有写者时读者才能读,没有读者时写者才能写
「写-写」互斥:没有其他写者时,写者才能写
接下来,提出几个解决方案来分析分析,依旧是用信号量来解决问题。
方法一,读者优先
所谓读者优先,意思就是说只要有读者,那么写者就阻塞,直到没有读者了才能继续写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| sem_t *write; int rCount; sem_t *rCountSem;
void Writer() { while(1) { sem_wait(write); write(); sem_post(write); } }
void Reader() { while(1) { sem_wait(rCountSem); if(rCount==0) { sem_wait(write); } rCount++; sem_post(rCountSem);
read();
sem_wait(rCountSem); rCount--; if(rCount==0) { sem_post(write); } sem_post(rCountSem); } }
|
方法二,写者优先
既然有读者优先策略,自然也有写者优先策略。
只要有写者准备要写入,写者应尽快执行写操作,后来的读者就必须阻塞;
如果有写者持续不断写入,则读者就处于饥饿;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| sem_t *write; int wCount; sem_t *wCountSem; sem_t *read; int rCount; sem_t *rCountSem;
void writer() { while(1) { sem_wait(wCountSem); if(wCount==0) { sem_wait(read); } wCount++; sem_post(wCountSem);
sem_wait(write); write(); sem_post(write);
sem_wwait(wCountSem); wCount--; if(wCount==0) { sem_post(read); } sem_post(wCountSem); } }
void Reader() { while(1) { sem_wait(read); sem_wait(rCountSem); if(rCount==0) { sem_wait(write); } rCount++; sem_post(rCountSem); sem_post(read);
read();
sem_wait(rCountSem); rCount--; if(rCount==0) { sem_post(write); } sem_post(rCountSem); } }
|
一开始,当有写者开始写操作的时候,会进行P(read),那么此时后续的读者就无法进行读取操作,但需要注意的是,即使后续的读者无法进行操作,但是如果已经有读者在读取数据了,也会执行P(write)操作,也就意味着写者是无法执行写操作。
方案三,公平策略
优先级相同;
写者、读者互斥访问;
只能一个写者访问临界区;
可以有多个读者同时访问临街资源;、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| int rCount; sem_t *write; sem_t *flag; sem_t *rCountSem;
void write() { sem_wait(flag); sem_wait(write); write(); sem_post(write); sem_post(flag); }
void read() { sem_wait(flag); sem_wait(rCountSem); if(rCount==0) { sem_wait(write); } rCount++; sem_post(rCountSem); sem_post(flag);
read();
sem_wait(rCountSem); rCount--; if(rCount==0) { sem_post(write); } sem_post(rCountSem); }
|
对比方案一的读者优先策略,可以发现,读者优先中只要后续有读者到达,读者就可以进入读者队列, 而写者必须等待,直到没有读者到达。
没有读者到达会导致读者队列为空,即 rCount==0,此时写者才可以进入临界区执行写操作。
而这里 flag 的作用就是阻止读者的这种特殊权限(特殊权限是只要读者到达,就可以进入读者队列)。
比如:开始来了一些读者读数据,它们全部进入读者队列,此时来了一个写者,执行 P(falg) 操作,使得后续到来的读者都阻塞在 flag 上,不能进入读者队列,这会使得读者队列逐渐为空,即 rCount 减为 0。
这个写者也不能立马开始写(因为此时读者队列不为空),会阻塞在信号量 wDataMutex 上,读者队列中的读者全部读取结束后,最后一个读者进程执行 V(wDataMutex),唤醒刚才的写者,写者则继续开始进行写操作。