信号量

在之前我们讨论了并发问题,多线程并发导致资源竞争,可能会引发意想不到的错误所以引入同步来协调多线程对共享数据的访问,任何时刻只能由一个线程执行临界区代码。确保同步正确可以通过底层硬件支持,也可以通过高层次抽象。高层次抽象中比较典型的两种则为信号量和管程。

基本同步方法

自旋锁为什么无法按先来先服务方式使用资源?

原因:自旋锁是由TS指令实现的临界区申请操作,第一个检测到临界区空闲的申请者而不是第一个开始检测的申请者进入。也就是说,访问顺序是由硬件随机决定的。如果要实现FIFO方式,一般都需要一个队列。

信号量

信号量是操作系统提供的一种协调共享资源访问的方法

  • 软件同步是平等线程间的一种同步协商机制
  • OS是管理者,由操作系统仲裁谁来使用资源,地位高于进程
  • 用信号量来表示系统资源的数量

信号量与软件同步区别

软件同步是平等线程间的一种同步协商机制;

信号量是由地位高于进程的管理者OS协调的同步机制

信号量组成

由一个整形(sem)变量和两个原子操作组成

P():sem减1

  • 若sem < 0, 进入等待,否则继续

V():sem加1

  • sem <= 0,唤醒一个等待进程

信号量的特性

信号量是被保护的整数变量

  • 初始化完成后,只能通过P()V() 操作修改
  • 由操作系统保证PV操作是原子操作
  • P() 可能阻塞(没有资源,处于等待状态)
  • V() 不会阻塞(释放资源,唤醒等待状态的进程)

通常假定信号量是公平的

  • 线程不会被无限期阻塞在 P() 操作(实际系统中有一个最长时限的参数,超时之后错误返回)
  • 假定信号量等待先进先出(但是在实际系统中公平有所偏差)

信号量的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Semaphore {
int sem;
WaitQueue q;
}
Semaphore::P() {
sem--;
if (sem < 0) {
Add this thread t to q;
block(p);
}
}
Semaphore::V() {
sem++;
if (sem <= 0) {
Remove a thread t from q;
wakeup(t);
}
}

若将程序修改为以下形式,即互换P操作的sem修改与sem条件判断的顺序,则无法确保互斥访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
P(){ 
if (sem <= 0) {
q. enqueue(t);
block(t);
}
sem--;
}

V() {
sem++;
if (!q.empty()) {
t = q.dequeue();
wakeup(t);
}
}

比如,一个例子:以下以Pn表示进程n执行P操作,Vn表示进程n执行V操作

P1,sem=0,q=[],1进入临界区

P2,sem=0,q=[2],2被阻塞,不能执行sem–

V1,sem=1,q=[],2被唤醒,此时2处于就绪态,当进程调度时,如果没有马上调度2,比如调度了3,执行P3,3发现sem=1,于是直接跳到sem–并且进入临界区;而3在临界区的过程中,若发生进程切换,切到2时,2执行sem–,sem=-1,并且2也进入临界区。这就导致会有多个进程进入临界区的情况。因此这种做法是不可取的。

若将if修改为while则可解决互斥访问的问题,但是无法保证按照FIFO方式申请信号量。

  • 一个线程A调用P()原语时,由于线程B正在使用该信号量而进入阻塞状态;注意,这时value的值为0。
  • 线程B放弃信号量的使用,线程A被唤醒而进入就绪状态,但没有立即进入运行状态;注意,这里value为1。
    在线程A处于就绪状态时,处理机正在执行线程C的代码;线程C这时也正好调用P()原语访问同一个信号量,并得到使用权。注意,这时value又变回0。
  • 线程A进入运行状态后,重新检查value的值,条件不成立,又一次进入阻塞状态。
  • 至此,线程C比线程A后调用P原语,但线程C比线程A先得到信号量。

信号量的分类

  • 二进制信号量,资源数目为 0 或 1
  • 资源信号量,可为任何非负值

两者等价,基于一个可以实现另一个

信号量的使用

互斥访问

  • 临界区的互斥访问控制

条件同步

  • 线程间事件等待

互斥访问

1
mutex = new Semaphore(1);

每类资源设置一个信号量,初值为 1

1
2
3
mutex->P();
Critical Section;
mutex->V();

必须成对使用 P()V() 操作

不申请直接释放,多个线程进入缓冲区

申请不释放,缓冲区无线程,但谁也进不去

  • P() 保证互斥访问临界资源
  • V() 在使用后释放临界资源
  • PV操作不能次序错误、重复 、遗漏

条件同步

1
condition = new Semaphore(0);

条件同步设置一个信号量,初值为 0

线程a执行P操作后信号量为负值,进入等待状态,线程B执行V操作后,信号量又回到0,此时线程a可以继续往下执行,通过这种方式实现条件同步。

什么是信号量?它与软件同步方法的区别在什么地方?

信号量是由操作系统提供的一种协调共享资源访问的方法。信号量是一种抽象数据类,由一个被保护的整形变量(sem)和P()、V()两个原子操作组成,表示系统资源的数量。

区别:

  • 软件同步是平等线程间的一种同步协商机制;
  • 信号量是由地位高于进程的管理者OS协调的同步机制。

管程

用于多线程互斥访问共享资源的程序结构

  • 采用面向对象方法,简化了线程间的同步控制
  • 任一时刻最多只有一个线程执行管程代码
  • 正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复

管程与临界区有什么异同?

相同点:在任一时刻最多只有一个线程执行管程代码或临界区代码;

不同:正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复;而临界区不支持临时退出;

管程的使用

  • 在对象/模块中收集相关共享数据
  • 定义访问共享数据的方法

管程组成

  • 一个锁
    • 控制管程代码的互斥访问(入口)
  • 0或者多个条件变量
    • 管理共享数据的并发访问
  • 局部数据变量只能被管程的过程访问
  • 一个进程通过调用管程的一个过程进入管程
  • 在任何时候,只能有一个进程在管程中执行

条件变量

条件变量为管程内的等待机制

  • 进入管程的线程因资源被占用而进入等待状态
  • 每个条件变量表示一种等待原因,对应一个等待队列
  • Wait()操作
    • 将自己阻塞在等待队列中
    • 唤醒一个等待者或释放管程的互斥访问
  • Signal()操作
    • 将等待队列中的一个线程唤醒
    • 如果等待队列为空则等同于空操作

条件变量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Class Condition {
int numWaiting = 0; // 等待线程数
WaitQueue q;
}
Condition::Wait(lock){
numWaiting++; // 有线程处于等待状态
Add this thread t to q;
release(lock); // 释放管程的访问权
schedule(); //need mutex,执行调度
require(lock); // 请求管程访问权
}
Condition::Signal(){
if (numWaiting > 0) {
// 如果等待队列为空,则为空操作
Remove a thread t from q;
wakeup(t); //need mutex
numWaiting--;
}
}

信号量初值与资源数相同,而条件变量初值为0。

条件变量的实现和信号量类似,但是有些区别,参考mtu课件cs241,wiki,总结如下

  • 对管程的互斥锁的释放和获得
  • signal和V语义的不同:PV操作必须是成对的,但signal/wait操作完全不需要保证这一点
  • wait和P语义的不同:V操作后线程可能会继续执行,但wait操作后,线程必然进入等待队列并阻塞
  • 执行signal/wait时,都默认已经获得了互斥锁

信号量和条件变量是并发问题的两种处理模型。

信号量将并发的问题抽象为有限的资源,用计数器表示,资源足够时往下走,不够时等待。

条件变量则将并发的问题抽象为事件,当满足某种事件的时候,往下走,不满足某种事件的时候暂时放弃锁。

管程语义

参考cmu课件piazza,考虑以下情况:线程A在条件变量的等待队列中等待资源,此时线程B在该条件变量上执行signal()操作,根据处理方式分为:

  1. mesa管程
  2. hansen管程
  3. hoare管程

mesa管程

  • 线程B执行signal之后,不会立刻退出管程,而是执行到lock.release()之后才进入就绪态
  • 线程A会被移动到入口等待队列中
  • 在wait后被唤醒的进程不一定会被立刻调度,因此需要用while来检查条件
  • 大部分实际实现的管程采用的是这一语义

hoare管程

  • 线程B执行signal之后,迅速唤醒等待中的线程A,自己进入signal队列中(这个队列是此语义特有的)
  • 每次有线程退出时,先到signal队列中调度线程,如果signal队列空,才到入口等待队列调度线程
  • 实际实现中一般不采用,因为需要多一个队列,代价增大了

hansen管程

线程B退出的同时才执行signal操作

Mesa管程:占用管程的当前进程可在任何时刻释放占用资源并唤醒相应的等待进程,当前进程继续执行,被唤醒放回入口队列队首等待当前进程释放管程访问权;

Hoare管程:占用管程的当前进程可在任何时刻释放占用资源并唤醒相应的等待进程,当前进程进入唤醒队列等待,被唤醒进程继续执行直到释放管程访问权;管程空闲时,优先查看唤醒队列中的等待进程,唤醒队列中没有等待进程时再查看入口队列;

Hansen管程:占用管程的当前进程只在退出管程时释放占用资源并唤醒相应的等待进程,被唤醒进程继续执行直到释放管程访问权;

条件判断中while和if对释放处理中的执行顺序影响:
在Hansen和Mesa管程中,由于条件变量释放操作signal时并没有立即放弃管程访问权,资源的可用状态可能变化,需使用while()进行重新检查;
在Hoare管程中,由于条件变量释放操作signal同时表示立即放弃管程访问权,资源的可用状态保持不变,可使用if判断,不需要再次检查。

依据目前的理解,Hansen和Mesa管程在程序行为效果上来看,它们是一致的。

哲学家就餐问题

方案一

1
2
3
4
5
6
7
8
9
10
11
12
13
#define N 5 // 哲学家个数
semaphore fork[5]; // 互斥操作,信号量初值为1
void philosopher(int i) {
// 哲学家编号:0 - 4
while (ture) {
think();
P(fork[i]); // 拿左边的叉子
P(fork[(i + 1) % N]); // 拿右边的叉子
eat();
V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子
}
}

不正确,有可能5个人同时拿左边叉子,都拿不到右边叉子,形成死锁。

方案二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
semaphore mutex; // 互斥信号量,初值1
void philosopher(int i)
// 哲学家编号:0 - 4
while(TRUE) {
think();
P(mutex); // 进入临界区 只有一个哲学家能就餐

P(fork[i]); // 去拿左边的叉子
P(fork[(i + 1) % N]);// 去拿右边的叉子
eat();
V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子

V(mutex); // 退出临界区
}

互斥访问正确,但任何时间只有一个哲学家就餐,叉子可满足两位哲学家同时就餐,性能差。

方案三

  • 和方案1一样,使用5个信号量表示筷子
  • 哲学家根据编号不同,拿取筷子的顺序不同,从而避免都拿到左边刀叉而等待右边刀叉形成循环等待的情况
  • 此时没有死锁,且允许两个人同时就餐

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
void philosopher(int i) // 哲学家编号:0 - 4
while(TRUE) {
think();

if (i % 2 == 0) {
// 偶数 先拿左 后拿右 奇数 先拿右 后拿左
P(fork[i]); // 去拿左边的叉子
P(fork[(i + 1) % N]); // 去拿右边的叉子
} else {
P(fork[(i + 1) % N]); // 去拿右边的叉子
P(fork[i]); // 去拿左边的叉子
}
eat();

V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子
}

读者写者问题

共享资源的两类使用者

  • 读者 只读取数据,不修改
  • 写者 读取和修改数据
  • 读读允许
    • 同一时刻允许有多个读者同时读
  • 读写互斥
    • 没有写者时,读者才能读
    • 没有读者时,写者才能写
  • 写写互斥
    • 没有其他写者时,写者才能写

信号量

  • 信号量WriteMutex
    • 控制读写操作互斥
    • 初始化为 1
  • 读者计数Rcount
    • 正在进行读操作的读者数目
    • 初始化为 0
  • 信号量CountMutex
    • 控制对读者计数的互斥修改(保护读者计数)
    • 初始化为 1

writer

1
2
3
P(WriteMutex);
write();
V(WriteMutex);

reader

1
2
3
4
5
6
7
8
9
10
11
12
13
P(CountMutex); // 保护 Rcount
if (Rcount == 0)
P(WriteMutex);
//若为当前第一个读者,开启读写互斥
++Rcount;
V(CountMutex);
read();
P(CountMutex);
--Rcount;
//若为当前最后一个读者,释放互斥访问权限
if (Rcount == 0)
V(WriteMutex);
V(CountMutex);

读者写者问题的优先策略

  • 读者优先策略
    • 只要有读者正在读状态,后来的读者都能直接进入
    • 若读者持续不断进入,则写者就处于饥饿
  • 写者优先策略
    • 只要有写者就绪,写者应尽快执行写操作
    • 若写者持续不断就绪,则读者就处于饥饿

管程

管程的状态变量

1
2
3
4
5
6
7
AR = 0; // 正在读的读者
AW = 0; // 正在写的写者
WR = 0; // 等待读的读者
WW = 0; // 等待写的写者
Lock lock;
Condition okToRead;
Condition okToWrite;

reader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Database::Read() {
// Wait until no writers;
StartRead();
read database;
// check out – wake up waiting writers;
DoneRead();
}
Database::StartRead() {
lock.Acquire();
while ((AW+WW) > 0) {
//写者优先,只要有写者就等待
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Database::DoneRead() {
lock.Acquire();
AR--;
if (AR == 0 && WW > 0) {
// 当前没有读者并有等待写的写者 则唤醒写者
okToWrite.signal();
}
lock.Release();
}

writer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Database::Write() {
// Wait until no readers/writers;

StartWrite();
write database;
// check out-wake up waiting readers/writers;
DoneWrite();
}
Database::StartWrite() {
lock.Acquire();
while ((AW+AR) > 0) {
//写者优先,有正在写的写着或正在读的读者则等待
WW++;
okToWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}
Database::DoneWrite() {
lock.Acquire();
AW--;
if (WW > 0) {
// 优先唤醒等待写的写者
okToWrite.signal();
}
else if (WR > 0) {
// 如果没有等待写的写者 才唤醒等待读的读者
okToRead.broadcast();
}
lock.Release();
}

while中的判断条件可根据优先策略进行调整,例子采取了写者优先策略。

以上管程在读/写操作时并没有申请互斥信号量,因为在之前申请互斥信号量时将正在/等待读/写的计数等设置完成,此时可确保读写正常进行。

生产者消费者问题

信号量

问题分析

  • 任何时刻只能有一个线程操作缓冲区 (互斥访问)
  • 缓冲区空时,消费者必须等待生产者 (条件同步)
  • 缓冲区满时,生产者必须等待消费者 (条件同步)

用信号量描述每个约束

  • 二进制信号量(mutex) 描述互斥访问
  • 资源信号量(fullBuffers) 描述缓冲区是否有数据
  • 资源信号量(emptyBuffers) 描述缓冲区是否有空余区域

其中P操作之间的顺序不可颠倒,会引起线程阻塞,V操作不会阻塞,顺序无所谓。

信号量不足

  • 读/开发代码比较困难
    • 程序员需要掌握信号量机制
  • 容易出错
    • 使用的信号量已被另一个线程占用
    • PV操作必须成对出现
  • 不能处理死锁问题

为什么在生产者-消费者问题中先申请互斥信号量会导致死锁?

如果先申请互斥信号量,后申请资源信号量,则在两种情况下可能会出现循环等待:

  • 生产者获得互斥信号量后检查emptyBuffers资源信号量,发现缓冲区满了,于是进入睡眠状态;此时消费者无法获得互斥信号量,于是无法消耗缓冲区内的资源
  • 消费者获得互斥信号量后检查fullBuffers资源信号量,发现缓冲区空了,于是进入睡眠状态;此时生产者无法获得互斥信号量,于是无法将资源放入缓冲区内

管程

1
2
3
4
5
6
7
8
class BoundedBuffer {
Lock lock;
// 管程入口的锁
int count = 0;
// 缓冲区数据计数
Condition notFull, notEmpty;
// 条件变量
}

先申请锁再检查条件不存在问题,因为管程不成功时可以放弃互斥访问权限,而信号量则会引起死锁。

管程将PV操作集中到一个模块中,简化和降低同步机制的实现难度。

信号量和管程实现的对比

信号量中存有int变量sem以及WaitQueue变量q,根据信号量的实现代码,我们可以得出semq的含义:

  • q代表当前正在等待资源的线程组成的等待队列,若当前资源足够所有进程使用,q为空;
  • sem代表【到目前为止,若所有请求该资源的线程都能够获取该资源,那么资源还剩下多少(这里我们假设资源个数可以为负)】;
  • sem也可以有另一种理解:当sem非负时,sem代表剩余资源的个数;当sem为负数时,sem的绝对值代表等待队列q的长度。

而当我们使用条件变量解决有限资源问题时,我们通常会在条件变量之外,管程之中加入整型变量count,来帮助条件变量记录当前剩余多少资源(非负)。查看条件变量的实现代码,我们可以得出条件变量中整型变量numWaiting以及WaitQueue变量q的含义:

  • q代表当前正在等待资源的线程组成的等待队列,若当前资源足够所有进程使用,q为空;
  • numWaiting代表等待队列q的长度(非负)。

结合使用信号量以及条件变量解决有限资源问题的实例,以及以上我们对信号量和条件变量的分析,我们可以得出以下结论:

  • 在任一状态,信号量中的q和条件变量中的q完全相同;
  • sem非负时,含义与管程中的count相同,此时numWaiting为0;
  • sem为负数时,sem的绝对值等于numWaiting,此时count为0。

在生产者-消费者这个问题实例中:

  • 信号量emptyBuffers与条件变量notFull是匹配的,满足上面3个条件,此时count在代码中以n - count的形式出现;
  • 信号量fullBuffers与条件变量notEmpty是匹配的,满足上面3个条件,此时count在代码中以count的形式出现;

综上所述,两种解决方法是完全等价的,至于为什么用管程实现更加安全方便,个人认为老师在视频中并没有解释得很清楚,和老师讨论后得出结论如下:

  • 用信号量的时候,所有信号量都要自己维护,并配对好PV;使用条件变量也要根据条件配对好Wait和Signal函数,但是信号量机制允许把PV操作放在任何代码中,而管程只允许把PV操作放在管程内部。
  • 用管程的时候,我们可以理解为BoundedBuffer继承了一个管程类,因此操作系统会给BoundedBuffer中每一个方法自动加上锁(即lock->Acquire()lock->Release()函数并不用自己写,是系统加上的),因此更加安全可控,容易查错。

ucore lab7中实现信号量的sem值非负,这样看来ucore中信号量的sem值和条件变量中的count值应该是完全相等的。

同时piazza上另一解释为:

信号量将并发的问题抽象为有限的资源,用计数器表示,资源足够时往下走,不够时等待。

条件变量则将并发的问题抽象为事件,当满足某种事件的时候,往下走,不满足某种事件的时候暂时放弃锁。

生产者消费者问题中,可将条件变量的事件设置为“队列满”、“队列空”,那么就如同信号量那样处理了,所以让人看起来觉得没有区别。