互斥

概念

  • 临界区(critical section):访问共享资源的一段代码,资源通常是一个变量或数据结构。
  • 竞态条件(race condition):出现在多个执行线程大致同时进入临界区时,它们都试图更新共享的数据结构,导致非预期的结果。

当程序含有一个或多个竞态条件,程序的输出会因运行而异,具体取决于哪些线程在何时运行,因此结果是不确定的(non-deterministic)。

如何设计机制使得临界区避免出现竞态条件是「并发」的重要主题。

对于运行程序(尤其是并发程序)的两个重要属性:

  • 安全性:没有坏事发生
    • 安全性要求执行中的任何有限步骤内都保持这个性质。
    • 如:执行过程中不可出现除 0 错误
  • 活性:好事终将发生
    • 活性要求只要在最终能满足要求即可,一个隐含的要求是执行中不能发生「不可挽回」的步骤。
    • 如:执行最终停止(执行中间出现一个无限循环就「不可挽回」)

临界区解决方案需满足条件:

  • 互斥(Mutual Exclusion):临界区内最多只能有一个线程(安全性)
  • 行进(Progress):如果当前临界区内没有线程,并且有线程想要进入临界区,那么最终某个想要进入临界区的线程会进入该临界区(活性)
  • 有界等待(bounded waiting):如果某个线程想要进入临界区,那么其等待的期限有限(期间其他线程进入该临界区次数有上限),不可一直排队等待(Fairness/No starvation)
    • 如果这个上限没有被指定,那么这就是一个活性属性,其最终会进入。
    • 如果这个上限被指定具体数字,那么这就是一个安全性属性,因为任何一次执行的有限步骤内,其等待进入的次数只要超过这个上限就发生了「坏事」。

除此以外还有一个临界区解决方案所关心的性能(performance),即进入和退出该临界区的两个操作应该相对于在该临界区内做的计算而言尽可能的小。

经验法则

设计并发算法时,优先考虑安全性。

(Lock)是一个变量,其保存了锁在某一时刻的状态。状态有:

  • 可用的(available/unlocked/free),表示没有线程持有锁。
  • 被占用的(acquired/locked/held),表示有一个线程持有锁,正处于临界区。

其提供两个配对操作:

  • lock()/acquire():尝试获取锁,如果没有其他线程持有锁,该线程会获得锁,进入临界区,否则不会返回(该线程会卡在那里)。
  • unlock()/release():锁变成可用,之前如果有因获得锁操作没成功卡在那的线程,那么其中一个会进入临界区。

锁为程序员提供了最小程度的调度控制,通过给临界区加锁,保证临界区内只有一个活跃变量(互斥)。

1
2
3
4
5
6
7
8
9
10
long sum = 0;

void *T_sum(void *arg) {
for (int i = 0; i < 1000000; i++) {
lock();
sum++;
unlock();
}
return NULL;
}

如何实现这样的锁?

关中断

采用「关中断」可以实现锁,关中断后当前程序状态机就能独占计算机系统,然后 unlock 就是再次打开中断。

  • x86:cli 清除 eflags 寄存器的 IF 位;sti 设置 IF
  • ARM64:msr daifset, #2 设置 DAIF 寄存器的 IRQ 位;msr daifclr, #2 清除 IRQ
  • RISC-V:清除 mstatus 寄存器的 MIE 位;设置 mstatus 寄存器的 MIE

但是不是所有中断都可以被屏蔽,处理器有不可屏蔽中断(non-maskable interrupts),主要是为了一些不可处理或通知一些不可恢复的错误,如内部芯片系统错误、系统数据损坏等。

同时这种方式有问题,若临界区代码死循环了,那么整个系统也就卡死了。此外中断关闭时间过长会导致很多其他重要的外界响应丢失,如磁盘 I/O 完成事件。

另外关中断是特权指令,用户态应用无法执行,只有操作系统有权限。在操作系统内核的实现中,关闭中断倒是一个常见的操作。

以及对于多处理器系统,关中断是不够的,因为每个处理器有独立的寄存器组,中断也只是每个处理器内部状态。

lock 标志位

还可以尝试通过软件层面,例如通过 lock 标志位。

1
2
3
4
5
6
7
8
9
10
11
12
int flag = 0;

void lock() {
while (flag == 1) { // 自旋的观测 flag
// busy wait
}
flag = 1;
}

void unlock() {
flag = 0;
}

这里假设了 load 一个变量并测试它是原子的(flag == 1),同时假设设置一个变量是原子的(flag = 1)。

但是这样的实现仍然会有问题,这个在上一篇介绍过了,检验 flag 和设置 flag 之间的操作不是原子的,可能会出现竞态条件。

此外上面的假设在单处理器时代是合理的设定,但现代计算系统已经不正确了,因为现代处理器有多级缓存,多核,乱序执行等特性,这些特性会导致上面的假设不成立。

判定条件

如果每个线程都采用同一个判定 test flag == 1,那么完全存在所有的线程可能得到同样的 true。但如果给每个线程的判定条件都不一样,且判定条件不可能同时为 true,那么就不会出现这个问题。一个线程为 true 意味着其他线程为 false,逻辑上达成了互斥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// T1
void lock() {
while (flag == 0) {
// busy wait
}
}

void unlock() {
flag = 0;
}

// T2
void lock() {
while (flag == 1) {
// busy wait
}
}

void unlock() {
flag = 1;
}

这个要求了内存顺序一致性模型才能成立。代码中的 while 循环依赖对 flag 的即时可见性。若没有内存一致性保证,线程可能读取到陈旧的 flag 值,导致逻辑错误。

unlock 会给其他线程机会,自己下一次 lock 判定会为 false,只有别的进程unlock 后自己的下一次lock 判定才会为 true。所以这个方法是让每个线程「严格轮转」的。

这样的实现是正确的,但是这样的实现会导致「饥饿」(starvation),即某个线程可能永远得不到锁。

因为这个方法一个线程能否得到一个锁完全依赖于另一个线程是否先进入了临界区,如果另一个线程一直不释放锁,那么这个线程就永远得不到锁。这违背了活性。

若线程 T1 释放锁后立即尝试重新获取,但 T2 尚未运行,此时 T1 将因 flag = 0(需等待 flag = 1)被阻塞,直到 T2 运行并释放锁。

Peterson 算法

Dekker 算法是第一个被证明是正确的互斥算法,但是它的实现比较复杂,Peterson 算法是一个更简单的算法。

主要思想就是之前尝试中的 test 的改进:除了测试 flag 之外,还看看是否有别的线程要进入临界区(这个部分可以避免之前尝试出现的可能没有「行进」问题)。因此,除了有一个全局变量来进行 flag == i 判定,还得有一个全局变量 intents 来记录是否有其他线程要进入临界区。

Peterson 原始算法只能在两个线程上工作,不过该算法很容易拓展到 NN 个线程上(当然,NN 必须提前知道)。

Peterson 算法通过两个同步机制实现互斥:

  • 主动声明意图:每个线程明确表达进入临界区的意愿
  • 轮转谦让机制:当多个线程同时竞争时,通过轮转变量决定谁先进入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool flag[2] = {false, false}; // 表示线程是否想进入临界区
int turn = 0; // 决定轮转顺序的关键变量

void lock(int self) { // self 是当前线程编号(0 或 1)
int other = 1 - self; // 计算对方线程编号

flag[self] = true; // 步骤 1:声明自己的意图
turn = other; // 步骤 2:主动让出优先权

// 步骤 3:谦让检查
while (flag[other] && turn == other) {
// 当对方也想进入临界区且轮转权在对方时,等待
}
}

void unlock(int self) {
flag[self] = false; // 退出时撤销自己的意图
}
%%{init: {'themeVariables': { 'edgeLabelBackground':'#FFF5E6'}}}%%
flowchart TD
    subgraph Thread_1["线程 1 操作路线"]
        direction TB
        T1_Entry(["进入 lock(1)"])
        T1_SetFlag["设置 flag[1] = true"]:::green
        T1_SetTurn[设置 turn = 0]:::blue
        T1_Check{等待条件?}
        T1_True[[循环等待]]:::red
        T1_False[[进入临界区]]:::orange
    end

    subgraph Thread_0["线程 0 操作路线"]
        direction TB
        T0_Entry(["进入 lock(0)"])
        T0_SetFlag["设置 flag[0] = true"]:::green
        T0_SetTurn[设置 turn = 1]:::blue
        T0_Check{等待条件?}
        T0_True[[循环等待]]:::red
        T0_False[[进入临界区]]:::orange
    end

    T0_Entry --> T0_SetFlag --> T0_SetTurn --> T0_Check
    T0_Check -->|"flag[1] == true<br>且 turn == 1"| T0_True -.-> T0_Check
    T0_Check -->|等待条件不满足| T0_False

    T1_Entry --> T1_SetFlag --> T1_SetTurn --> T1_Check
    T1_Check -->|"flag[0] == true<br>且 turn == 0"| T1_True -.-> T1_Check
    T1_Check -->|等待条件不满足| T1_False

    classDef green fill:#D5F5E3,stroke:#2ECC71;
    classDef blue fill:#D6EAF8,stroke:#3498DB;
    classDef red fill:#FADBD8,stroke:#E74C3C;
    classDef orange fill:#FDEBD0,stroke:#EB984E;

双重保险机制:

  • flag[] 数组确保线程能感知彼此的意图
  • turn 变量确保不会出现互相谦让导致的死锁

只有同时满足「对方想进入临界区」和「轮转权在对方」两个条件才会等待,因此确保了至少有一个进程能通过检查。

通过 turn 变量的原子性修改,确保当多个线程同时竞争时,最终只会有一个线程满足条件

示例场景:

  1. 线程 A 和 B 同时设置 turn 为对方
  2. 由于赋值是原子的,最终 turn 值由最后写入的线程决定
  3. 保证至少有一个线程能跳出等待循环

正确性【待补充】

现实的 loadstore 都不是原子的,甚至两个线程同一刻读的 flag 都不一致。因此现实架构上的 Peterson 算法其实是错误的。

在现代计算机体系架构下,由于多处理器以及乱序指令流的存在,需要硬件的支持,比如:内存屏障,例如 gcc 编译器可以将 __sync_synchronize() 编译为相应指令架构的内存屏障:

  • x86:mfence
  • ARM64:dmb ish(数据同步屏障)
  • RISC-V:fence(内存屏障)

硬件锁

上面提到的方案中,只要检验与设置两条指令可以原子化就可以了。软件方案需要用 Peterson 算法等来保证互斥,在现代计算机体系结构下还是有问题的,若硬件能支持两个操作的原子化,那么就不需要软件层面的保证了。

很多硬件架构提供了原子指令实现 Test-And-Set Lock (TSL)。x86 下利用 lock 前缀加上 cmpxchg 指令实现 TSL。

  • TSL (Test-And-Set Lock):检查标志位是否为 0,同时将其置为 1,返回旧值
    • 特性:单指令完成「检查 + 修改」的原子操作
  • CAS (Compare-And-Swap):比较内存值与预期值,若相等则替换为新值
    • 典型指令:cmpxchg(x86架构)
    • CAS 是更通用的原子操作,TSL 是 CAS 的特例(预期值固定为 0)

lock 前缀作用:

  • 总线锁定:通过 lock 前缀触发总线锁,阻止其他 CPU 访问该内存地址
  • 缓存一致性:触发缓存一致性协议(如 MESI),确保所有 CPU 缓存同步
  • 自旋等待:执行期间其他 CPU 访问该地址会被阻塞直到操作完成
1
2
3
4
5
6
7
8
9
10
11
12
13
// 原子比较交换操作
asm volatile(
"lock cmpxchg %2, %1" // 原子比较交换指令
: "+a" (expected) // 用 eax 寄存器保存预期值
: "m" (flag), // 内存中的锁标志
"r" (1) // 要设置的新值
: "memory", "cc" // 告知编译器内存和条件码会被修改
);

// 等价于
if (flag == 0) {
flag = 1;
}

有了 TSA 硬件支持,之前的锁可以实现如下:

1
2
3
4
5
6
7
8
9
10
11
int flag = 0; // 锁标志位

void lock() {
// 持续尝试直到成功获取锁
while(__sync_val_compare_and_swap(&flag, 0, 1) != 0);
// 等效于:while(TSL(flag) == 1);
}

void unlock() {
flag = 0; // 释放锁
}

事实上,所有 x86 CPU 都具有锁定一个特定内存地址的能力,当这个特定内存地址被锁定后,它就可以阻止其他的系统总线读取或修改这个内存地址。通过 Lock 指令前缀 + 具体指令,就可以在执行这个具体指令时「锁住」。

自旋锁的改进

上面介绍了自旋锁(spinlock),它通过忙等待(busy waiting)的方式来实现互斥。但是,自旋锁存在一些问题:

  • 公平性问题:当一个线程在自旋等待时,无法保证它最终能进入临界区。如果有其他线程持续进入临界区,并且调度器总是优先调度它们,那么当前自旋的线程可能会一直等待,违背了有界等待原则。
  • 性能问题:自旋锁会让等待的线程持续占用 CPU 资源,进行无意义的循环检查。

为了解决公平性问题,可以引入「排队」机制。

排号自旋锁(Ticket Lock)

排号自旋锁的基本思想是:

  1. 每个尝试进入临界区的线程获取一个唯一的票号(ticket)
  2. 维护一个全局的当前服务号(turn)
  3. 只有当线程持有的票号与当前服务号相同时,才能进入临界区
  4. 线程退出临界区时,递增服务号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
typedef struct {
int ticket; // 当前发放的最大票号
int turn; // 当前应该进入的票号
} lock_t;

lock_t lock;

void lock_init() {
lock.ticket = 0;
lock.turn = 0;
}

void lock() {
// 原子地获取票号
int myturn = __sync_fetch_and_add(&lock.ticket, 1);
// 等待轮到自己
while (lock.turn != myturn) {
; // 自旋
}
}

void unlock() {
// 递增服务号
__sync_fetch_and_add(&lock.turn, 1);
}

这里使用了 GCC 内建的原子操作 __sync_fetch_and_add,它等价于:

1
2
3
4
5
6
7
// 将 1 加到 flag.ticket,并返回 flag.ticket 的「旧」值给 myturn
asm volatile (
"lock xaddl %0, %1"
: "+r" (myturn), "+m" (flag.ticket)
:
: "memory", "cc"
);

flag.ticketflag.turn 以及线程数的关系:

  • flag.ticket 表示下一个可用的票号,其值等于已进入或正在等待进入临界区的线程总数。
  • flag.turn 表示当前持有锁或被允许进入临界区的线程的票号。
  • 活跃线程数(包括正在临界区内执行的和正在自旋等待的)始终小于等于 flag.ticket - flag.turn

GCC 提供了很多内建的原子操作函数,可以编写跨平台的原子操作,而不仅限于 x86 平台。

  • type __sync_fetch_and_add(type *ptr, type value)
  • bool __sync_bool_compare_and_swap(type *ptr, type oldval type newval)
  • type __sync_lock_test_and_set(type *ptr, type value)
  • void __sync_lock_release(type *ptr)

细粒度锁

在多线程程序中,并非所有线程都需要互斥访问所有共享资源。如果对所有共享资源都使用同一个锁(粗粒度锁),会限制程序的并发性能。

细粒度锁(fine-grained lock)的思想是:将共享资源划分为多个更小的部分,每个部分用一个独立的锁来保护。这样,不同线程可以并发地访问不同的资源部分,提高了并发度。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 粗粒度锁
void lock();
void unlock();
// 粗粒度锁锁住的时候,所有线程都无法访问共享资源

// 细粒度锁
typedef struct {
// ...
} lock_t;

void lock(lock_t *lk);
void unlock(lock_t *lk);
// 可以只锁住部分共享资源,其他线程可以并发访问其他部分

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 粗粒度锁(全局唯一锁)
void transfer(Account* a, Account* b) {
lock(); // 锁住整个银行系统
// 转账操作…
unlock();
}

// 细粒度锁(每个账户独立锁)
void transfer(Account* a, Account* b) {
lock(&a->lock); // 只锁账户 A
lock(&b->lock); // 只锁账户 B
// 转账操作…
unlock(&b->lock);
unlock(&a->lock);
}

内核中的互斥锁

互斥锁的正确性依赖于正确的使用,lockunlock 必须按照正确的方式才能形成保护:

1
2
3
T1: spin_lock(&lk); sum++; spin_unlock(&lk);
T2: spin_lock(&lk); sum++; spin_unlock(&lk);
T3: sum++;

T3 不正确的访问就将整个并发逻辑破坏,因此正确的使用互斥锁是非常重要的。

在操作系统内核中,互斥锁的使用非常普遍,因为内核需要管理各种共享资源。但是,内核中的互斥锁实现需要考虑中断的影响。

中断与自旋锁

考虑以下场景:

  1. 一个线程在内核态执行,访问某个共享资源,并持有自旋锁。
  2. 此时,发生了一个中断,CPU 切换到中断处理程序。
  3. 中断处理程序也需要访问相同的共享资源,尝试获取自旋锁。
  4. 由于自旋锁已经被持有,中断处理程序会进入自旋等待。

由于中断处理程序的优先级通常高于普通线程(还高过系统调用),因此中断处理程序会一直占用 CPU,导致持有锁的线程无法继续执行,也就无法释放锁。这就导致了死锁。这种现象称为「优先级反转」(priority inversion)。

graph LR
    A[线程] -->|系统调用| B{内核态}
    B -->|获取锁| C[(共享资源)]
    C -->|中断发生| D[中断处理程序]
    D -->|尝试获取锁| C

    style A fill:#9ff,stroke:#333,stroke-width:2px
    style B fill:#cfc,stroke:#333,stroke-width:2px
    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#ccf,stroke:#333,stroke-width:2px

解决方法:关中断

为了避免上述死锁问题,一种常见的做法是在获取自旋锁之前关闭中断,在释放锁之后再打开中断。这样可以确保在持有锁期间不会发生中断,从而避免了竞争条件。

但是,简单地在 lock 前关中断、unlock 后开中断是错误的。如果在获取锁之前中断已经是关闭的,那么在释放锁时打开中断就会破坏之前的状态。

因此,正确的做法是:

  1. 在获取锁之前,保存当前的中断状态(开或关)。
  2. 关闭中断。
  3. 获取锁。
  4. 释放锁。
  5. 恢复之前保存的中断状态。
graph LR
    classDef start_end fill:#fff,stroke:#333,stroke-width:2px,stroke-dasharray: 0,fill-opacity:1
    classDef correct_step fill:#e6f4ea,stroke:#2ecc40,stroke-width:2px
    classDef correct_cond fill:#e6f4ea,stroke:#2ecc40,stroke-width:2px,stroke-dasharray: 5 5
    classDef error_step fill:#feebe9,stroke:#e74c3c,stroke-width:2px
    classDef error_cond fill:#feebe9,stroke:#e74c3c,stroke-width:2px,stroke-dasharray: 5 5
    classDef warning fill:#fff8e7,stroke:#f1c40f,stroke-width:2px
    classDef highlight fill:#fff,stroke:#3498db,stroke-width:2px

    subgraph 正确实现
        direction LR
        H[开始]:::start_end --> I{保存中断状态}:::correct_step
        I --> J{获取锁}:::correct_cond
        J --> K[临界区]:::correct_step
        K --> L{释放锁}:::correct_cond
        L --> M{恢复中断状态}:::correct_step
        M --> N[结束]:::start_end
    end

    subgraph 错误实现
        direction LR
        A[开始]:::start_end --> B{关中断}:::error_step
        B --> C{获取锁}:::error_cond
        C --> D[临界区]:::error_step
        D --> E{释放锁}:::error_cond
        E --> F{开中断}:::error_step
        F --> G[结束]:::start_end
    end

    subgraph 场景说明
        direction LR
        O[进程 1: 中断已关闭]:::highlight --> P{尝试获取锁}:::warning
        P --> Q[incorrect_unlock 后, 中断错误开启]:::highlight
    end

    style H stroke:#333,stroke-width:2px
    style N stroke:#333,stroke-width:2px
    style G stroke:#333,stroke-width:2px
    linkStyle 0 stroke-width:2px

xv6 的实现

xv6 操作系统提供了一个自旋锁的实现,很好地处理了中断问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
typedef struct {
const char *name;
int status;
struct cpu *cpu;
} spinlock_t;

void spin_lock(spinlock_t *lk) {
// 关闭中断,避免死锁
push_off();

// 检查是否已经持有锁,如果是则 panic
if (holding(lk)) {
panic("acquire %s", lk->name);
}

// 获取锁
int got;
do {
got = atomic_xchg(&lk->status, LOCKED);
} while (got != UNLOCKED);

// 记录持有锁的 CPU
lk->cpu = mycpu;
}

void spin_unlock(spinlock_t *lk) {
if (!holding(lk)) {
panic("release %s", lk->name);
}

lk->cpu = NULL;
atomic_xchg(&lk->status, UNLOCKED);

// 恢复中断状态
pop_off();
}

其中,push_off()pop_off() 函数用于保存和恢复中断状态。它们可以嵌套调用,push_off() 记录中断关闭的次数,pop_off() 记录想要打开中断的次数,只有当想要打开中断的次数等于中断关闭的次数时,才会真正打开中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 关闭中断,并记录关闭前的状态
void push_off(void) {
int old = ienabled();
struct cpu *c = mycpu;

iset(false); // 关闭中断
if (c->noff == 0) {
c->intena = old;
}
c->noff += 1;
}

// 尝试打开中断,只有当嵌套计数为 0 时才真正打开
void pop_off(void) {
struct cpu *c = mycpu;
if (ienabled()) {
panic("pop_off - interruptible");
}
if (c->noff < 1) {
panic("pop_off");
}
c->noff -= 1;
if (c->noff == 0 && c->intena) {
iset(true); // 打开中断
}
}

用户态互斥锁的问题

在用户态程序中,自旋锁同样存在性能问题。如果临界区执行时间较长,或者持有锁的线程被调度出去,其他线程会浪费大量的 CPU 时间进行自旋。

内核态的临界区一般为短时间操作,因此自旋锁的性能问题不太明显。但在用户态,临界区可能包含长时间操作,同时用户态程序无法通过中断关闭来避免更严重的资源浪费。

sequenceDiagram
    participant ThreadA as 线程 A
    participant Lock as 自旋锁
    participant ThreadB as 线程 B
    participant ThreadC as 线程 C

    ThreadA->>Lock: 尝试获取锁
    Lock-->>ThreadA: 锁可用,获取成功
    ThreadA->>ThreadA: 进入临界区
    par 自旋等待
        ThreadB->>Lock: 尝试获取锁
        Lock-->>ThreadB: 锁不可用,开始空转
        ThreadC->>Lock: 尝试获取锁
        Lock-->>ThreadC: 锁不可用,开始空转
    end
    Note over ThreadB,ThreadC: 不断检查锁状态<br>(空转浪费 CPU)
    ThreadA->>ThreadA: 执行长时间任务或被调度出
    ThreadA->>Lock: 释放锁
    Lock-->>ThreadB: 锁可用,获取成功
    ThreadB->>ThreadB: 进入临界区

yield

一种简单的解决方案是使用 sched_yield() 系统调用,主动让出 CPU。

1
2
3
4
5
6
7
8
9
void yield_lock(spinlock_t *lk) {
while (xchg(&lk->locked, 1)) {
syscall(SYS_yield); // 让出 CPU
}
}

void yield_unlock(spinlock_t *lk) {
xchg(&lk->locked, 0);
}

但是,yield 只是暂时让出 CPU,该线程仍然处于就绪状态,随时可能被再次调度。在获得锁之前,反复的「被调度 > 让出 CPU」会带来大量不必要的上下文切换。

与操作系统调度器配合

更好的解决方案是,用户态的锁操作与操作系统调度器配合:

  • mutex_lock(&lk):尝试获取锁 lk。如果失败(lk 已被持有),利用系统调用阻塞当前线程,将其加入等待锁的队列,并让出 CPU。如果成功,则进入临界区。
  • mutex_unlock(&lk):释放锁。如果等待队列中有线程,则利用系统调用唤醒其中一个线程,将其从等待队列中移除,并加入就绪队列。

这样,等待锁的线程不会占用 CPU 资源,只有当锁可用时才会被唤醒。

于是操作系统需要对一个锁维护一个等待队列。

Linux Futex

然而实现没那么简单,由于操作并非原子的,唤醒信号与等待操作的时序错位,即当发送方(如生产者)在接收方(如消费者)进入等待状态之前发出唤醒信号,信号将因无等待者而被丢弃,后续接收方进入等待时可能永久阻塞,这被称为「唤醒丢失」(wakeup lost)。

Linux 提供了 futex (Fast Userspace Mutexes) 系统调用,用于支持用户态的互斥锁。

  • futex_wait(int *address, int expected):首先原子地检查 address 指向的值是否等于 expected。如果相等,则阻塞当前线程;否则,立即返回。
  • futex_wake(int *address):唤醒一个等待 address 指向的锁的线程。

基于这两个系统调用,可以实现如下互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#define UNLOCK 0   // 锁空闲
#define ONE_HOLD 1 // 锁被单个线程持有
#define WAITERS 2 // 锁被持有,(可能)有线程在等待

void mutex_lock(spinlock_t *lk) {
// 尝试获取锁
// 如果 lk 的值是 UNLOCK,则将其设为 ONE_HOLD,返回原来的值
int c = cmpxchg(lk, UNLOCK, ONE_HOLD);
if (c != UNLOCK) {
// 锁已被持有
do {
if (c == WAITERS || cmpxchg(lk, ONE_HOLD, WAITERS) != UNLOCK) {
// 等待锁
futex_wait(lk, WAITERS);
}
} while ((c = cmpxchg(lk, UNLOCK, WAITERS)) != UNLOCK);
}
}

void mutex_unlock(spinlock_t *lk) {
// 减少等待计数
if (atomic_dec(lk) != ONE_HOLD) {
// 有等待者
*lk = UNLOCK;
futex_wake(lk);
}
}

这段代码比较复杂,其正确性在于:

  • mutex_lock 函数只会在成功锁住 lk 后才会返回。
    • 获得 lk 的线程本身会将 lk 原子的设为 ONE_HOLD,而其他等待线程将会将 lk 原子的设为 WAITERS,这使得除获得锁外的其他线程不可以在临界区,只有获得锁住线程在退出临界区调用 mutex_unlock 才会将 lk 设为 UNLOCK:这保证了上锁的正确性。
  • 等待的线程能够被唤醒,因为只要有等待的线程在 wait,那么此时的 lk 一定为 WAITERS。而 unlock 就一定会将某个等待的线程唤醒。
  • 由于 unlock 函数在 wake_up 之前先将 lk 设为 UNLOCK,因此上述即使发生了上述丢失 wakeup 的调度,由于 futex_wait 的特性,那个线程在调用 futex_wait 时会发现 lk 的值变了,因此不会休眠。

mutex_lock 函数的实现:

  1. 尝试快速获取锁
    int c = cmpxchg(lk, UNLOCK, ONE_HOLD);
    • 目的:原子交换操作,尝试将锁从 UNLOCK 转为 ONE_HOLD
    • 成功:若 c == UNLOCK,当前线程获得锁,直接返回。
    • 失败:锁已被其他线程持有,进入等待逻辑。
  2. 等待锁释放
    1
    2
    3
    4
    5
    do {
    if (c == WAITERS || cmpxchg(lk, ONE_HOLD, WAITERS) != UNLOCK) {
    futex_wait(lk, WAITERS);
    }
    } while ((c = cmpxchg(lk, UNLOCK, WAITERS)) != UNLOCK);
    • 状态升级:尝试将锁状态从 ONE_HOLD 升级为 WAITERS(标记有等待者)。
    • 休眠:通过 futex_wait 进入休眠,仅在锁状态为 WAITERS 时阻塞。
    • 唤醒后重试:被唤醒后再次尝试通过 cmpxchg 获取锁,确保只有一个等待线程成功。
      • 避免多个线程同时获取锁:多个被唤醒的线程可能同时尝试获取锁,通过 cmpxchg 保证只有一个线程成功。
      • 锁状态保持在 WAITERS,后续线程在尝试获取锁时,会直接进入休眠队列,而非自旋竞争。
      • 防御唤醒丢失。

mutex_unlock 函数的实现:

  1. 原子释放锁
    1
    2
    3
    4
    if (atomic_dec(lk) != ONE_HOLD) {
    *lk = UNLOCK;
    futex_wake(lk);
    }
    • 原子递减atomic_dec 将锁状态减 1,返回操作前的旧值。
    • 判断等待者
      • 若旧值为 ONE_HOLD,表示无等待者,直接释放锁。
      • 若旧值为 WAITERS,表示有等待者,需唤醒。
  2. 安全唤醒
    • 设置 UNLOCK:先将锁状态设为 UNLOCK,避免唤醒后立即竞争。
    • 唤醒线程:通过 futex_wake 唤醒一个等待线程。

正确性保障:

  1. 互斥性
    • 唯一持有者:只有成功执行 cmpxchg(lk, UNLOCK, ...) 的线程能进入临界区。
    • 状态屏障WAITERS 状态阻止新线程通过自旋获取锁,强制进入休眠队列。
  2. 避免唤醒丢失
    • futex_wait 的健壮性futex_wait 会检查 lk 的当前值是否匹配预期值(WAITERS)。若状态已变(例如被其他线程提前唤醒),则立即返回,避免永久阻塞。
    • 原子操作的顺序性unlock 中先设置 UNLOCK 再唤醒,确保被唤醒线程能看到最新状态。

两阶段锁

Linux 的 futex 实际上是一种两阶段锁(two-phase lock):

  1. Fast Path:首先尝试自旋一次。如果成功获取锁,则直接进入临界区。
  2. Slow Path:如果自旋失败,则根据情况使用 futex_wait 阻塞自己。

这种设计结合了自旋锁和阻塞锁的优点:

  • 当锁竞争不激烈时,使用自旋锁可以避免系统调用的开销。
  • 当锁竞争激烈时,使用阻塞锁可以避免 CPU 资源的浪费。

POSIX 线程锁

POSIX 线程库(pthread)提供了一套标准的线程同步 API,包括互斥锁。

1
2
3
4
5
6
7
8
9
#include <pthread.h>

pthread_mutex_t mutex;

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_lock 实现了上述需求(很多线程的争抢下依然能保持很好的性能),使用方法和自旋锁一致,大部分情况下(在有操作系统的情况)使用它即可。

并发数据结构

线程安全(thread-safe)的数据结构是指可以被多个线程并发访问的数据结构,也称为并发数据结构(concurrent data structure)。

要实现线程安全的数据结构,通常需要在访问和更新数据结构时使用锁(一把或多把)。

并发计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typedef struct {
int value;
pthread_mutex_t lock;
} counter_t;

void init(counter_t *c) {
c->value = 0;
pthread_mutex_init(&c->lock, NULL);
}

void increment(counter_t *c) {
pthread_mutex_lock(&c->lock);
c->value++;
pthread_mutex_unlock(&c->lock);
}

void decrement(counter_t *c) {
pthread_mutex_lock(&c->lock);
c->value--;
pthread_mutex_unlock(&c->lock);
}

int get(counter_t *c) {
pthread_mutex_lock(&c->lock);
int rc = c->value;
pthread_mutex_unlock(&c->lock);
return rc;
}

这个计数器使用了一个互斥锁来保护对 value 的访问,因此是线程安全的。

提高并发性能

上述并发计数器的实现使用了单一的锁,限制了并发性能。为了提高性能,可以采用以下方法:

  • 每个 CPU 维护一个本地计数器:所有 CPU 共享一个全局计数器。
  • 本地计数器使用本地锁保护:全局计数器使用全局锁保护。
  • 本地计数器定期更新全局计数器:而不是每次都更新。

这样,大多数情况下只需要更新本地计数器,减少了对全局锁的竞争,提高了并发度。