同步(基础)

并发的挑战

并发程序的核心挑战在于其执行顺序的不确定性。即使是很简单的代码片段,在多线程环境下,由于线程调度和指令交错执行,可能产生多种不同的最终状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 共享变量
int x = 0, y = 0;

// 线程 T1
void T1() {
x = 1;
int t = y;
// ... 使用 t ...
}

// 线程 T2
void T2() {
y = 1;
int t = x;
// ... 使用 t ...
}

这种不确定性使得我们难以枚举和预测所有可能的状态,尤其是当线程数量增多时。

为何需要同步

并发程序中,并非所有可能的状态都是我们期望的。

  • 不期望的状态:例如,多个线程同时进入临界区,破坏数据一致性。
  • 期望的状态:例如,主线程调用 pthread_join 等待子线程结束后才能继续执行。

为了避免不期望的状态,并确保程序能达到期望的状态,我们需要对线程的执行进行控制。这种控制通常要求线程:

  1. 不能太快:在某个条件满足之前必须等待。例如,线程 B 必须等待线程 A 完成某项操作后才能继续。
  2. 不能太慢:一旦某个条件满足,等待的线程应该能够继续执行(避免死锁或饥饿)。

同步

同步(Synchronization)是指控制并发进程(或线程)的执行,使得它们在变化过程中保持一定的相对关系,以协调它们对共享资源的访问或保证特定的执行顺序。

互斥也是同步的一种特殊形式,它保证了在任意时刻只有一个线程能访问临界区。但同步的概念更广泛,还包括了执行顺序的协调。

简单的互斥锁(如 lock()/unlock())虽然能保证临界区内的原子性,但无法直接控制线程间的执行顺序。

经典同步问题:生产者-消费者

生产者-消费者问题(Producer-Consumer Problem)是并发领域一个非常经典且具有代表性的同步问题,由 Edsger Dijkstra 首次提出。

  • 场景:一个或多个生产者(Producer)线程和一个或多个消费者(Consumer)线程共享一个固定大小的有界缓冲区(Bounded Buffer)。
  • 生产者任务:生成数据项,如果缓冲区未满,则将数据放入缓冲区;如果缓冲区已满,则必须等待。
  • 消费者任务:从缓冲区取走数据项,如果缓冲区非空,则取出数据进行消费;如果缓冲区为空,则必须等待。

graph LR
    P(生产者) -- 生产数据 --> B{有界缓冲区};
    B -- 消费数据 --> C(消费者);

    subgraph 约束条件
        direction LR
        P -- 缓冲区满? --> W1[等待];
        C -- 缓冲区空? --> W2[等待];
    end

    style P fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#9cf,stroke:#333,stroke-width:2px
    style B fill:#ccf,stroke:#333,stroke-width:2px
    style W1 fill:#f99,stroke:#333,stroke-width:1px
    style W2 fill:#f99,stroke:#333,stroke-width:1px

简化模型:打印括号

为了更清晰地理解同步机制,我们将问题简化为打印括号:

  • 生产者:打印左括号 (,相当于向缓冲区放入一个元素。
  • 消费者:打印右括号 ),相当于从缓冲区取出一个元素。
  • 缓冲区:用括号的嵌套深度depth)来表示,容量为 n
  • 合法序列:任何时刻,括号深度 0 <= depth <= n
1
2
void produce() { printf("("); }
void consume() { printf(")"); }

非法序列示例(假设缓冲区容量为 3)

  • (())):非法,消费时 depth 变为 -1(缓冲区为空无法消费)。
  • (((()))):非法,生产第四个 (depth 变为 4(超出缓冲区容量)。

同步目标

我们需要确保:

  • 生产者在 depth < n 时才能打印 (
  • 消费者在 depth > 0 时才能打印 )

这本质上是要求线程在特定条件满足时才能执行,即实现 wait_until 语义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 伪代码
void produce() {
wait_until(depth < n) {
printf("(");
// depth++; // 更新状态
}
}

void consume() {
wait_until(depth > 0) {
printf(")");
// depth--; // 更新状态
}
}

问题在于如何高效且正确地实现 wait_until

尝试与陷阱

  1. 无锁尝试(错误)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 错误:竞态条件
    int n; // 容量
    int depth = 0; // 当前深度

    void produce() {
    while(1) {
    retry:
    // 检查和修改 depth 不是原子操作
    int ready = (depth < n);
    if (!ready) goto retry; // 自旋等待
    printf("(");
    depth++; // 可能与其他线程冲突
    }
    }
    // consume 类似
    • 问题:对共享变量 depth 的读写存在竞态条件,多个线程可能同时读到旧值并进行修改,导致 depth 值不正确,违背安全性。
  2. 加锁尝试 1(错误)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 错误:条件检查与操作分离
    mutex_t lk;
    int n, depth = 0;

    void produce() {
    while(1) {
    retry:
    mutex_lock(&lk);
    int ready = (depth < n);
    mutex_unlock(&lk); // 释放锁

    // 在 unlock 和 if 之间,depth 可能已被其他线程修改!
    if (!ready) goto retry; // 条件可能已失效

    mutex_lock(&lk);
    printf("(");
    depth++;
    mutex_unlock(&lk);
    }
    }
    // consume 类似
    • 问题:在检查条件(depth < n)和根据条件执行操作(printf, depth++)之间释放了锁。这期间,其他线程可能修改 depth,使得之前检查的条件失效。
  3. 加锁尝试 2(正确但低效)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 正确但低效:忙等待
    mutex_t lk;
    int n, depth = 0;

    void produce() {
    while(1) {
    retry:
    mutex_lock(&lk);
    int ready = (depth < n);
    if (!ready) {
    mutex_unlock(&lk);
    goto retry; // 持有锁时条件不满足,释放锁并重试 (忙等待)
    }
    // 条件满足,持有锁执行操作
    printf("(");
    depth++;
    mutex_unlock(&lk);
    }
    }
    // consume 类似
    • 问题:虽然保证了正确性(检查和操作都在锁的保护下),但在条件不满足时,线程会不断地 lock -> check -> unlock -> retry,形成忙等待或自旋,浪费 CPU 资源。
  4. 阻塞尝试(潜在问题:唤醒丢失)
    为了避免忙等待,我们希望线程在条件不满足时阻塞,让出 CPU,并在条件可能满足时被唤醒。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // 潜在问题:唤醒丢失(Lost Wakeup)
    mutex_t lk;
    int n, depth = 0;
    queue_t producer_waiting_list, consumer_waiting_list;

    void produce() {
    while(1) {
    retry:
    mutex_lock(&lk);
    int ready = (depth < n);
    if (!ready) {
    mutex_unlock(&lk);
    wait(&producer_waiting_list); // 阻塞生产者
    goto retry;
    }
    printf("(");
    depth++;
    // 缓冲区从空变为非空,可能需要唤醒消费者
    wakeup(&consumer_waiting_list);
    mutex_unlock(&lk);
    }
    }

    void consume() {
    while(1) {
    retry:
    mutex_lock(&lk);
    int ready = (depth > 0);
    if (!ready) {
    mutex_unlock(&lk);
    wait(&consumer_waiting_list); // 阻塞消费者
    goto retry;
    }
    printf(")");
    depth--;
    // 缓冲区从满变为未满,可能需要唤醒生产者
    wakeup(&producer_waiting_list);
    mutex_unlock(&lk);
    }
    }
    • 旧有的问题:唤醒丢失(Lost Wakeup)。考虑生产者 P 和消费者 C:
      1. C 检查 depth > 0,发现为 false。
      2. 在 C 调用 wait 之前,发生上下文切换。
      3. P 执行,增加 depth,调用 wakeup(&consumer_waiting_list)。但此时 C 还没睡,唤醒信号丢失。
      4. C 恢复执行,调用 wait 进入睡眠,可能永远不会被唤醒。
    sequenceDiagram
        participant C as 消费者
        participant OS as 操作系统
        participant P as 生产者
    
        C->>C: mutex_lock()
        C->>C: check depth (== 0) -> ready = false
        Note right of C: 在 wait() 前发生上下文切换
        P->>P: mutex_lock()
        P->>P: 生产,depth++
        P->>OS: wakeup(consumer_list)
        Note left of OS: 无消费者在等待,唤醒丢失
        P->>P: mutex_unlock()
        Note left of C: 上下文切换回消费者
        C->>OS: wait(consumer_list)
        Note right of C: 沉睡,可能永远不会被唤醒
        OS-->>C: Blocked

    为了解决唤醒丢失,需要一种机制能原子地完成「释放锁并进入睡眠」的操作。这引出了条件变量

条件变量(Condition Variable)

条件变量(Condition Variable, CV)是一种同步原语,它允许线程在某个条件不满足时原子地阻塞,并等待该条件被其他线程改变后发出信号而被唤醒。条件变量总是与一个互斥锁关联使用,以保护共享状态(即「条件」)。

条件变量

条件变量本身不存储条件的状态(它没有记忆),它只提供阻塞和唤醒的机制。条件的判断需要程序员显式地在代码中进行。

核心操作

  1. cond_wait(cond_t *cv, mutex_t *lk)
    • 原子操作
      1. 释放传入的互斥锁 lk
      2. 将当前线程加入条件变量 cv 的等待队列,并使其阻塞。
    • 唤醒后:当线程被唤醒时,cond_wait 会在返回前重新获取互斥锁 lk
    • 前提:调用 cond_wait 时,必须已经持有互斥锁 lk
  2. cond_signal(cond_t *cv)
    • 唤醒至少一个(通常是一个)正在 cond_wait(cv, ...) 上等待的线程。
    • 如果没有线程在等待 cv,则此操作无效(信号不会被「保存」)。
    • 建议:调用 cond_signal通常也应持有与 cv 关联的互斥锁 lk,以确保状态修改和信号发送之间的一致性。
  3. cond_broadcast(cond_t *cv)
    • 唤醒所有正在 cond_wait(cv, ...) 上等待的线程。
    • 同样,建议在持有锁时调用。

POSIX API

POSIX 线程库 (pthread) 提供了标准的条件变量 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <pthread.h>

// 声明和初始化
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
// 或者动态初始化
int pthread_cond_init(pthread_cond_t *restrict cv,
const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cv);

// 等待
int pthread_cond_wait(pthread_cond_t *restrict cv,
pthread_mutex_t *restrict mutex);

// 唤醒单个
int pthread_cond_signal(pthread_cond_t *restrict cv);

// 唤醒所有
int pthread_cond_broadcast(pthread_cond_t *restrict cv);

基于 Futex 的简易实现

条件变量可以通过 Linux 的 futex 系统调用实现,以避免唤醒丢失。futex 允许在用户空间进行快速检查,仅在需要阻塞或唤醒时才陷入内核。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 简化的概念性实现(可能存在整数溢出等问题)
typedef struct {
unsigned value; // 用于 futex 的内部状态
// 可能还需要一个等待队列等内部结构
} cond_t;

// 与 CV 关联的互斥锁
mutex_t lk;

void cond_wait(cond_t *cv, mutex_t *lk) {
// 1. 读取当前 futex 值(原子操作)
// 这个值用于检测在释放锁和调用 futex_wait 之间是否有 signal 发生
int current_value = atomic_load(&cv->value);

// 2. 释放互斥锁
mutex_unlock(lk);

// 3. 调用 futex_wait
// 原子地检查 cv->value 是否仍等于 current_value。
// 如果是,则阻塞;如果不是(说明期间有 signal 修改了 value),则立即返回。
futex_wait(&cv->value, current_value);

// 4. 被唤醒后,重新获取互斥锁
mutex_lock(lk);
}

void cond_signal(cond_t *cv) {
// 1. 原子地增加 futex 值(通知潜在的 waiter 状态已变)
atomic_fetch_add(&cv->value, 1);

// 2. 调用 futex_wake 唤醒一个等待者
futex_wake(&cv->value);
}

关键点在于,futex_wait 的原子检查机制确保了即使在 mutex_unlockfutex_wait 调用之间发生了 cond_signal(它会改变 cv->value),futex_wait 也会检测到这种变化并立即返回,从而避免了唤醒丢失。

Signal 语义:Hansen/Mesa vs. Hoare

cond_signal 的行为细节,特别是唤醒线程和锁的交接方式,存在两种主要语义:

  1. Hansen/Mesa 语义(常用,如 POSIX, Java)

    • cond_signal 只是将一个等待线程从等待队列移动到就绪队列,并不保证该线程立即运行,也不直接传递锁。
    • 发出 signal 的线程继续持有锁并执行,直到它释放锁。
    • 被唤醒的线程需要重新竞争获取互斥锁,当它最终获得锁并从 cond_wait 返回时,不能保证之前等待的条件仍然满足(因为其他线程可能在此期间运行并改变了状态)。
    sequenceDiagram
        participant T_Signal as 发信号线程(T1)
        participant T_Wait as 等待线程(T2)
        participant Mutex as 互斥锁
        participant CV as 条件变量
    
        T_Wait->>Mutex: 加锁
        T_Wait->>CV: cond_wait(cv, mutex)
        Mutex-->>T_Wait: 解锁(由 wait 隐式完成)
        CV-->>T_Wait: 阻塞
        T_Signal->>Mutex: 加锁
        T_Signal->>T_Signal: 修改共享状态
        T_Signal->>CV: cond_signal(cv)
        CV-->>T_Wait: 将 T2 从等待队列移动到就绪队列
        T_Signal->>T_Signal: 保持锁并继续执行
        T_Signal->>Mutex: 解锁
        Note over T_Wait: T2 竞争互斥锁
        T_Wait->>Mutex: 加锁(当被调度且成功时)
        Mutex-->>T_Wait: 获得锁
        CV-->>T_Wait: cond_wait 返回
        T_Wait->>T_Wait: 重新检查条件!
  2. Hoare 语义(理论上简洁,实现复杂)

    • cond_signal 立即将锁和 CPU 控制权从发信号的线程(T1)转移给被唤醒的线程(T2)。
    • T1 阻塞,直到 T2 执行完毕并释放锁(或者 T2 再次调用 cond_wait)时,锁和控制权才交还给 T1。
    • 优点:当 T2 从 cond_wait 返回时,可以保证其等待的条件仍然满足(因为没有其他线程能在这期间运行)。
    • 缺点:实现复杂,需要调度器进行特殊处理,可能导致更多的上下文切换。
    sequenceDiagram
        participant T_Signal as 发信号线程(T1)
        participant T_Wait as 等待线程(T2)
        participant Mutex as 互斥锁
        participant CV as 条件变量
    
        T_Wait->>Mutex: 加锁
        T_Wait->>CV: cond_wait(cv, mutex)
        Mutex-->>T_Wait: 解锁(由 wait 隐式完成)
        CV-->>T_Wait: 阻塞
        T_Signal->>Mutex: 加锁
        T_Signal->>T_Signal: 改变共享状态
        T_Signal->>CV: cond_signal(cv)
        Note over T_Signal, T_Wait: 原子地转移锁和 CPU 控制权
        Mutex-->>T_Wait: (从 T1)获取锁
        CV-->>T_Wait: cond_wait 返回
        T_Signal-->>T_Signal: T1 阻塞
        T_Wait->>T_Wait: 执行临界区(条件成立)
        T_Wait->>Mutex: 解锁
        Note over T_Signal, T_Wait: 将锁和控制权还给 T1
        Mutex-->>T_Signal: (从 T2)获取锁
        T_Signal-->>T_Signal: T1 恢复

现实世界的选择

由于实现简单且对调度器的要求较低,Hansen/Mesa 语义是现代操作系统和编程语言(如 POSIX, Java)的普遍选择。这意味着使用这些系统的条件变量时,必须遵循特定的编程模式。

Hansen/Mesa 语义下的正确用法:while 循环

由于 Mesa 语义下,线程被唤醒时不保证条件仍然满足,因此必须cond_wait 返回后重新检查条件。这通常通过将 cond_wait 放在 while 循环中实现:

1
2
3
4
5
6
7
// 正确模式(Mesa 语义)
mutex_lock(&lk);
while (!condition_is_met) { // 使用 while 而不是 if
cond_wait(&cv, &lk);
}
// 条件满足,执行操作...
mutex_unlock(&lk);

原因:

  1. 虚假唤醒(Spurious Wakeup):某些实现可能在没有 signal 的情况下唤醒线程。
  2. 信号时机:在你被唤醒和重新获得锁之间,其他线程可能已经运行并再次改变了条件。
  3. broadcast:如果使用 cond_broadcast,多个线程被唤醒,但只有一个(或少数几个)线程的条件可能真正满足。

经验法则

在 Mesa 语义下,cond_wait 必须始终在 while 循环中调用。

生产者-消费者问题的条件变量解决方案

现在我们可以使用条件变量和 while 循环模式来正确解决生产者-消费者问题。

方案 1:使用两个条件变量

为生产者(等待缓冲区非满)和消费者(等待缓冲区非空)分别使用不同的条件变量,可以更精确地唤醒目标线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
mutex_t lk = MUTEX_INIT();
cond_t cv_p = COND_INIT(); // 生产者的 CV(如果满则等待)
cond_t cv_c = COND_INIT(); // 消费者的 CV(如果空则等待)
int n; // 容量
int depth = 0; // 当前深度

void produce() {
while(1) {
mutex_lock(&lk);
while (!(depth < n)) { // while 循环检查条件
cond_wait(&cv_p, &lk); // 等待缓冲区非满
}
// 条件满足(depth < n)
printf("(");
depth++;
// 唤醒可能在等待的消费者
cond_signal(&cv_c);
mutex_unlock(&lk);
}
}

void consume() {
while(1) {
mutex_lock(&lk);
while (!(depth > 0)) { // while 循环检查条件
cond_wait(&cv_c, &lk); // 等待缓冲区非空
}
// 条件满足(depth > 0)
printf(")");
depth--;
// 唤醒可能在等待的生产者
cond_signal(&cv_p);
mutex_unlock(&lk);
}
}
  • 优点signal 的目标明确,只唤醒需要被唤醒的角色(生产者唤醒消费者,消费者唤醒生产者)。
  • 缺点:需要管理两个条件变量。

方案 2:使用单个条件变量和 broadcast

使用一个条件变量,并在状态改变时唤醒所有等待者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
mutex_t lk = MUTEX_INIT();
cond_t cv = COND_INIT(); // 单个 CV
int n, depth = 0;

void produce() {
while(1) {
mutex_lock(&lk);
while (!(depth < n)) {
cond_wait(&cv, &lk);
}
printf("(");
depth++;
// 唤醒所有等待者(生产者和消费者)
cond_broadcast(&cv);
mutex_unlock(&lk);
}
}

void consume() {
while(1) {
mutex_lock(&lk);
while (!(depth > 0)) {
cond_wait(&cv, &lk);
}
printf(")");
depth--;
// 唤醒所有等待者
cond_broadcast(&cv);
mutex_unlock(&lk);
}
}
  • 优点:实现相对简单,只需一个 CV。
  • 缺点:可能导致不必要的唤醒(例如,生产者唤醒了其他生产者,但缓冲区仍然是满的)。由于 while 循环的存在,这些被错误唤醒的线程会再次检查条件并继续等待,因此正确性不受影响,但可能带来性能开销(所谓的 thundering herd 问题)。

单 CV + signal 的陷阱

如果只用一个 CV 并且使用 cond_signal,可能会出现问题。考虑缓冲区大小为 1,两个消费者 C1, C2 和一个生产者 P1:

  1. C1, C2 发现缓冲区为空,调用 cond_wait(&cv, &lk) 阻塞。
  2. P1 生产一个 item,调用 cond_signal(&cv)。假设 C1 被唤醒。
  3. 在 C1 重新获取锁之前,P1 再次运行,发现缓冲区已满 (depth=1),调用 cond_wait(&cv, &lk) 阻塞。
  4. C1 获取锁,消费 item,depth 变为 0。C1 调用 cond_signal(&cv)
  5. 问题:此时等待队列中有 P1 (需要 depth < 1) 和 C2 (需要 depth > 0)。如果 signal 唤醒了 C2,C2 会发现 depth == 0 并再次 wait。而 P1 则永远等待下去,导致死锁

使用两个 CV 或使用 `broadcast` 可以避免此问题。

条件变量使用法则

  1. 必须有关联的互斥锁:保护共享状态(条件)。
  2. 必须有共享状态:用于判断条件是否满足。
  3. wait/signal/broadcast 时必须持有锁。(wait 内部会原子释放和重获锁)
  4. wait 必须在 while 循环内(Mesa 语义)。
  5. 优先为不同条件使用不同 CV,除非 broadcast 是明确需要或更简单的选择。

应用:协调计算依赖

条件变量的 wait/signal 机制非常适合用于实现计算依赖关系(happens-before)。

  • 计算图:可以将一个复杂的计算任务表示为一个有向无环图(DAG),其中节点 vv 代表计算任务(事件),边 (u,v)E(u, v) \in E 表示任务 vv 依赖于任务 uu 的结果(uu happens-before vv)。
  • 并行化:没有依赖关系(图中没有路径相连)的任务可以并行执行。

通用并行化方案

  1. 为每个计算节点 vv 分配一个线程(或将其视为一个待调度任务)。
  2. 等待条件:节点 vv 开始计算前,必须 wait 直到所有前驱节点 uu(即存在边 (u,v)(u, v)) 都已完成。
  3. 完成信号:节点 uu 完成计算后,需要 signal(或 broadcast)通知所有后继节点 vv(即存在边 (u,v)(u, v)),告知它们依赖的条件可能已满足。

示例:并行化动态规划

动态规划算法通常涉及计算子问题的最优解,子问题之间存在依赖关系,形成一个 DAG。

编辑距离计算将字符串 A 转换为字符串 B 所需的最少编辑操作(插入、删除、替换)次数。有递推公式:dist(i, j) 依赖于 dist(i-1, j), dist(i, j-1), dist(i-1, j-1)

  • 单线程:通常按行、按列或按对角线(拓扑序)计算。
  • 多线程:观察到同一条反斜线i + j = const)上的所有 dist(i, j) 可以并行计算,因为它们只依赖于上一条反斜线 i + j - 1i + j - 2 上的结果。
    • 可以为每条反斜线(或每个节点)创建一个任务/线程。
    • 使用条件变量(或信号量)来同步:计算 dist(i, j) 的线程需要等待计算 dist(i-1, j), dist(i, j-1), dist(i-1, j-1) 的线程完成。完成后,发出信号。

通用并发设计框架:调度器-工作者

一种常见的并发模式是将任务调度和执行分离:

  • 调度器线程(生产者):维护计算图(或任务列表)和依赖关系。根据已完成的任务,找出当前可以运行的任务(满足依赖条件),将它们放入一个就绪队列,并 signal 工作者线程。
  • 工作者线程(消费者):从就绪队列中取出任务并执行。执行前可能需要 wait 等待调度器的信号(如果队列为空)。执行完成后,通知调度器任务已完成(可能通过 signal 或更新共享状态)。这类似于线程池(Thread Pool)模型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 概念代码
Queue ready_queue;
Mutex queue_lock;
CondVar queue_not_empty;
// ... 任务依赖、完成状态等共享数据 ...
Mutex state_lock;
CondVar state_changed; // 用于通知调度器

void T_worker() {
while (1) {
mutex_lock(&queue_lock);
while (ready_queue.empty()) {
cond_wait(&queue_not_empty, &queue_lock);
}
Task task = ready_queue.pop();
mutex_unlock(&queue_lock);

task.run(); // 执行计算

// 通知调度器任务完成
mutex_lock(&state_lock);
update_completion_status(task);
cond_signal(&state_changed); // 或 broadcast
mutex_unlock(&state_lock);
}
}

void T_scheduler() {
while (!all_tasks_done()) {
mutex_lock(&state_lock);
// 等待工作者完成任务,触发状态更新
// 可以使用 cond_wait(&state_changed, &state_lock) 或定期检查
List<Task> ready_tasks = find_ready_tasks(); // 根据依赖和完成状态查找
mutex_unlock(&state_lock);

mutex_lock(&queue_lock);
for (Task task : ready_tasks) {
ready_queue.push(task);
cond_signal(&queue_not_empty); // 通知工作者
}
mutex_unlock(&queue_lock);

// 可能需要等待 state_changed 信号
mutex_lock(&state_lock);
while(find_ready_tasks().empty() && !all_tasks_done()) {
cond_wait(&state_changed, &state_lock);
}
mutex_unlock(&state_lock);
}
}

复杂示例:打印 fish

  • 任务:有三种线程,分别无限打印 <, >, -。需要同步它们,使得屏幕交替打印 <><_><>_ 的组合序列。
  • 分析:这是一个状态同步问题。打印哪个字符取决于当前处于哪个状态。可以用有限状态机(FSM)来描述合法的前缀。还没学所以略。

信号量(Semaphore)

信号量(Semaphore)是 Dijkstra 在 1965 年提出的另一种强大的同步原语。与条件变量不同,信号量包含一个内部状态:一个非负整数值。

信号量

信号量是一个整数变量,只能通过两个原子操作来访问和修改:

  • P(sem)(来自「荷兰语」[^dutch] prolaag,尝试减少),也称为 wait(), down(), acquire()
  • V(sem) (来自荷兰语 verhoog,增加),也称为 signal(), up(), post(), release()

核心操作

  1. P(sem_t *sem)sem_wait(sem_t *sem)
    • 原子地:
      1. 检查信号量 sem 的值。
      2. 如果 sem > 0,则将其减 1,操作完成,线程继续执行。
      3. 如果 sem <= 0,则线程阻塞,直到其他线程调用 V(sem) 使其值变为正数。
  2. V(sem_t *sem)sem_post(sem_t *sem)
    • 原子地:
      1. 将信号量 sem 的值加 1。
      2. 如果此时有线程因 P(sem) 而阻塞在该信号量上,则唤醒其中一个线程。

POSIX API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <semaphore.h>

sem_t sem;

// 初始化信号量
// value: 初始值
// pshared: 0 表示线程间共享,非 0 表示进程间共享
int sem_init(sem_t *sem, int pshared, unsigned int value);

// 销毁信号量
int sem_destroy(sem_t *sem);

// P 操作(等待/减少)
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem); // 非阻塞版本
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); // 带超时的版本

// V 操作(发布/增加)
int sem_post(sem_t *sem);

// 获取当前值(主要用于调试)
int sem_getvalue(sem_t *sem, int *sval);

基于互斥锁和条件变量的实现

信号量的行为可以用互斥锁和条件变量来模拟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef struct {
int value; // 信号量的值
pthread_cond_t cond; // 用于阻塞等待的条件变量
pthread_mutex_t lock; // 保护 value 和 cond 操作的互斥锁
} sem_t;

void sem_init(sem_t *s, int pshared, int value) {
// pshared 参数在此实现中忽略
s->value = value;
pthread_cond_init(&s->cond, NULL);
pthread_mutex_init(&s->lock, NULL);
}

void sem_wait(sem_t *s) { // P 操作
pthread_mutex_lock(&s->lock);
while (s->value <= 0) { // 值不大于 0 则等待
pthread_cond_wait(&s->cond, &s->lock);
}
s->value--; // 值大于 0,减 1
pthread_mutex_unlock(&s->lock);
}

void sem_post(sem_t *s) { // V 操作
pthread_mutex_lock(&s->lock);
s->value++; // 值加 1
// 值增加后,可能使等待条件(value > 0)满足,唤醒一个等待者
pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->lock);
}

void sem_destroy(sem_t *s) {
pthread_cond_destroy(&s->cond);
pthread_mutex_destroy(&s->lock);
}

注意:sem_post 只需要 cond_signal 而不是 broadcast,因为 P 操作每次只消耗一个单位的「资源」,唤醒一个线程就足够了。while 循环在 sem_wait 中仍然是必要的,以处理虚假唤醒和保证条件的正确检查。

信号量的用途与「整数」理解

信号量的整数值可以被赋予不同的含义,从而实现不同的同步模式:

  1. 互斥

    • 将信号量初始化为 1
    • P(sem):获取锁(资源数为 1,只有一个线程能成功 P 操作)。
    • V(sem):释放锁(将资源数恢复为 1)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    sem_t mutex;
    sem_init(&mutex, 0, 1); // 初始化为 1

    void critical_section_example() {
    sem_wait(&mutex); // 进入临界区(P)
    // ... 临界区代码 ...
    sem_post(&mutex); // 退出临界区(V)
    // ... 剩余区代码 ...
    }
  2. 顺序控制(Ordering/Happens-Before)

    • 将信号量初始化为 0
    • 线程 A 在完成某个操作后调用 V(sem)
    • 线程 B 在需要等待 A 完成时调用 P(sem)。由于初始值为 0,B 会阻塞,直到 A 调用 V 使值变为 1。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    sem_t order_sem;
    sem_init(&order_sem, 0, 0); // 初始化为 0

    // 线程 T2(子线程)
    void* child(void *arg) {
    do_something_first();
    sem_post(&order_sem); // V 操作:通知 T1 我已完成
    return NULL;
    }

    // 线程 T1(主线程)
    int main() {
    pthread_t c;
    create_thread(&c, child);
    // ... 其他操作 ...
    sem_wait(&order_sem); // P 操作:等待 T2 完成
    do_something_after_child();
    return 0;
    }
  3. 通用资源计数(Counting Semaphore)

    • 将信号量初始化为可用资源的数量 N
    • P(sem):请求/获取一个资源。如果 value > 0,则资源可用,value 减 1;否则阻塞等待。
    • V(sem):释放/归还一个资源。value 加 1,并可能唤醒一个等待资源的线程。
    • 这正是生产者-消费者问题中缓冲区空位和数据项数量的抽象。

利用信号量解决生产者-消费者问题

信号量提供了一种非常自然和简洁的方式来解决生产者-消费者问题。

  • empty 信号量:表示缓冲区中空闲槽位的数量,初始值为 MAX(缓冲区容量)。生产者 P 此信号量。
  • full 信号量:表示缓冲区中已填充数据的数量,初始值为 0。消费者 P 此信号量。
  • mutex 信号量(或互斥锁):用于保护对缓冲区的互斥访问,初始值为 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#define MAX 10 // 缓冲区容量
sem_t empty;
sem_t full;
sem_t mutex; // 或者使用 pthread_mutex_t

// 缓冲区数据结构
int buffer[MAX];
int fill_ptr = 0;
int use_ptr = 0;

void put(int value) { /* 放入 buffer */ }
int get() { /* 从 buffer 取出 */ }

void init() {
sem_init(&empty, 0, MAX); // MAX 个空槽位
sem_init(&full, 0, 0); // 0 个数据项
sem_init(&mutex, 0, 1); // 互斥访问锁
}

void *producer(void *arg) {
int i = 0;
while (1) {
// 生产数据 item ...
int item = i++;

sem_wait(&empty); // 等待并获取一个空槽位(P(empty))
sem_wait(&mutex); // 获取缓冲区访问锁(P(mutex))
put(item); // 访问临界区:放入数据
sem_post(&mutex); // 释放缓冲区访问锁(V(mutex))
sem_post(&full); // 通知增加了一个数据项(V(full))
}
}

void *consumer(void *arg) {
while (1) {
sem_wait(&full); // 等待并获取一个数据项 (P(full))
sem_wait(&mutex); // 获取缓冲区访问锁 (P(mutex))
int item = get(); // 访问临界区:取出数据
sem_post(&mutex); // 释放缓冲区访问锁 (V(mutex))
sem_post(&empty); // 通知增加了一个空槽位 (V(empty))

// 消费数据 item ...
printf("Consumed: %d\n", item);
}
}

死锁陷阱:错误的加锁顺序

如果在消费者(或生产者)中颠倒 P(full)P(mutex) 的顺序:

1
2
3
4
// 错误顺序 - 可能导致死锁
sem_wait(&mutex); // 先获取互斥锁
sem_wait(&full); // 再等待数据项
// …

死锁场景:

  1. 消费者 C 运行,获取 mutex 锁。
  2. C 尝试 sem_wait(&full),但缓冲区为空(full 值为 0),C 阻塞,但仍然持有 mutex
  3. 生产者 P 运行,尝试 sem_wait(&mutex) 以访问缓冲区放入数据,但 mutex 已被 C 持有,P 阻塞。
  4. C 等待 P 生产数据(V(full)),P 等待 C 释放 mutex死锁发生

正确做法:总是先获取表示资源可用性(emptyfull)的信号量,再获取用于互斥访问(mutex)的信号量/锁

P/V 操作后的代码保护

在上面基于互斥锁和条件变量的信号量实现中,sem_waitsem_post 内部使用了锁,但在函数返回时锁已经释放。因此,紧跟在 sem_waitsem_post 调用之后的代码并不受该信号量内部锁的保护。如果这些代码需要访问共享资源(如生产者-消费者例子中的 buffer),则必须使用额外的互斥锁(如 mutex 信号量)来保护。

信号量的局限性

虽然信号量很强大,但在某些复杂场景下可能不如条件变量灵活:

  • 复杂条件:信号量只能表示整数状态。对于涉及多个变量或非整数条件的判断(如 fish 例子中的状态转移),使用条件变量配合显式状态检查通常更清晰。
  • 唤醒目标V 操作通常只唤醒一个等待者,且无法精确指定唤醒哪种类型的等待者(如果多种线程等待同一个信号量)。条件变量可以通过使用不同的 CV 来区分等待条件和唤醒目标。例如,在单 CV + signal 的生产者-消费者陷阱中,信号量也可能遇到类似问题(消费者 V 操作唤醒了另一个消费者而不是生产者)。

总结

特性 条件变量(Condition Variable) 信号量(Semaphore)
内部状态 无(无记忆) 非负整数(有记忆)
核心操作 wait, signal, broadcast P(wait/down), V(post/up)
关联锁 必须与互斥锁配合使用 可独立使用,或与互斥锁配合
条件检查 程序员必须在 while 循环中显式检查共享状态 P 操作隐式检查 value > 0
唤醒 signal 唤醒一个,broadcast 唤醒所有 V 操作唤醒一个
主要用途 等待任意复杂条件满足、协调线程执行 互斥、顺序控制、资源计数
灵活性 更高,适用于复杂条件 相对较低,主要用于计数和简单同步
易用性 模式固定(while 循环),但概念稍复杂 概念直接(资源获取/释放),但易因顺序错误死锁

选择哪种同步原语取决于具体问题的需求。对于简单的互斥、顺序控制或资源计数,信号量通常更简洁。对于需要等待复杂条件或需要更精细唤醒控制的场景,条件变量通常是更好的选择。