互斥(进阶)

自旋锁的改进

在上一章中,我们介绍了自旋锁(spinlock),它通过忙等待(busy waiting)的方式来实现互斥。但是,自旋锁存在一些问题:

  • 公平性问题:当一个线程在自旋等待时,无法保证它最终能进入临界区。如果有其他线程持续进入临界区,并且调度器总是优先调度它们,那么当前自旋的线程可能会一直等待,违背了有界等待原则。
  • 性能问题:自旋锁会让等待的线程持续占用 CPU 资源,进行无意义的循环检查。

为了解决公平性问题,可以引入「排队」机制。

排号自旋锁(Ticket Lock)

排号自旋锁的基本思想是:

  1. 每个尝试进入临界区的线程获取一个唯一的票号(ticket)
  2. 维护一个全局的当前服务号(turn)
  3. 只有当线程持有的票号与当前服务号相同时,才能进入临界区
  4. 线程退出临界区时,递增服务号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
typedef struct {
int ticket; // 当前发放的最大票号
int turn; // 当前应该进入的票号
} lock_t;

lock_t lock;

void lock_init() {
lock.ticket = 0;
lock.turn = 0;
}

void lock() {
// 原子地获取票号
int myturn = __sync_fetch_and_add(&lock.ticket, 1);
// 等待轮到自己
while (lock.turn != myturn) {
; // 自旋
}
}

void unlock() {
// 递增服务号
__sync_fetch_and_add(&lock.turn, 1);
}

这里使用了 GCC 内建的原子操作 __sync_fetch_and_add,它等价于:

1
2
3
4
5
6
7
// 将 1 加到 flag.ticket,并返回 flag.ticket 的「旧」值给 myturn
asm volatile (
"lock xaddl %0, %1"
: "+r" (myturn), "+m" (flag.ticket)
:
: "memory", "cc"
);

flag.ticketflag.turn 以及线程数的关系:

  • flag.ticket 表示下一个可用的票号,其值等于已进入或正在等待进入临界区的线程总数。
  • flag.turn 表示当前持有锁或被允许进入临界区的线程的票号。
  • 活跃线程数(包括正在临界区内执行的和正在自旋等待的)始终小于等于 flag.ticket - flag.turn

GCC 提供了很多内建的原子操作函数,可以编写跨平台的原子操作,而不仅限于 x86 平台。

  • type __sync_fetch_and_add(type *ptr, type value)
  • bool __sync_bool_compare_and_swap(type *ptr, type oldval type newval)
  • type __sync_lock_test_and_set(type *ptr, type value)
  • void __sync_lock_release(type *ptr)

细粒度锁

在多线程程序中,并非所有线程都需要互斥访问所有共享资源。如果对所有共享资源都使用同一个锁(粗粒度锁),会限制程序的并发性能。

细粒度锁(fine-grained lock)的思想是:将共享资源划分为多个更小的部分,每个部分用一个独立的锁来保护。这样,不同线程可以并发地访问不同的资源部分,提高了并发度。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 粗粒度锁
void lock();
void unlock();
// 粗粒度锁锁住的时候,所有线程都无法访问共享资源

// 细粒度锁
typedef struct {
// ...
} lock_t;

void lock(lock_t *lk);
void unlock(lock_t *lk);
// 可以只锁住部分共享资源,其他线程可以并发访问其他部分

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 粗粒度锁(全局唯一锁)
void transfer(Account* a, Account* b) {
lock(); // 锁住整个银行系统
// 转账操作…
unlock();
}

// 细粒度锁(每个账户独立锁)
void transfer(Account* a, Account* b) {
lock(&a->lock); // 只锁账户 A
lock(&b->lock); // 只锁账户 B
// 转账操作…
unlock(&b->lock);
unlock(&a->lock);
}

内核中的互斥锁

互斥锁的正确性依赖于正确的使用,lockunlock 必须按照正确的方式才能形成保护:

1
2
3
T1: spin_lock(&lk); sum++; spin_unlock(&lk);
T2: spin_lock(&lk); sum++; spin_unlock(&lk);
T3: sum++;

T3 不正确的访问就将整个并发逻辑破坏,因此正确的使用互斥锁是非常重要的。

在操作系统内核中,互斥锁的使用非常普遍,因为内核需要管理各种共享资源。但是,内核中的互斥锁实现需要考虑中断的影响。

中断与自旋锁

考虑以下场景:

  1. 一个线程在内核态执行,访问某个共享资源,并持有自旋锁。
  2. 此时,发生了一个中断,CPU 切换到中断处理程序。
  3. 中断处理程序也需要访问相同的共享资源,尝试获取自旋锁。
  4. 由于自旋锁已经被持有,中断处理程序会进入自旋等待。

由于中断处理程序的优先级通常高于普通线程(还高过系统调用),因此中断处理程序会一直占用 CPU,导致持有锁的线程无法继续执行,也就无法释放锁。这就导致了死锁。这种现象称为「优先级反转」(priority inversion)。

graph LR
    A[线程] -->|系统调用| B{内核态}
    B -->|获取锁| C[(共享资源)]
    C -->|中断发生| D[中断处理程序]
    D -->|尝试获取锁| C
    
    style A fill:#9ff,stroke:#333,stroke-width:2px
    style B fill:#cfc,stroke:#333,stroke-width:2px
    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#ccf,stroke:#333,stroke-width:2px

解决方法:关中断

为了避免上述死锁问题,一种常见的做法是在获取自旋锁之前关闭中断,在释放锁之后再打开中断。这样可以确保在持有锁期间不会发生中断,从而避免了竞争条件。

但是,简单地在 lock 前关中断、unlock 后开中断是错误的。如果在获取锁之前中断已经是关闭的,那么在释放锁时打开中断就会破坏之前的状态。

因此,正确的做法是:

  1. 在获取锁之前,保存当前的中断状态(开或关)。
  2. 关闭中断。
  3. 获取锁。
  4. 释放锁。
  5. 恢复之前保存的中断状态。
graph LR
    classDef start_end fill:#fff,stroke:#333,stroke-width:2px,stroke-dasharray: 0,fill-opacity:1
    classDef correct_step fill:#e6f4ea,stroke:#2ecc40,stroke-width:2px
    classDef correct_cond fill:#e6f4ea,stroke:#2ecc40,stroke-width:2px,stroke-dasharray: 5 5
    classDef error_step fill:#feebe9,stroke:#e74c3c,stroke-width:2px
    classDef error_cond fill:#feebe9,stroke:#e74c3c,stroke-width:2px,stroke-dasharray: 5 5
    classDef warning fill:#fff8e7,stroke:#f1c40f,stroke-width:2px
    classDef highlight fill:#fff,stroke:#3498db,stroke-width:2px

    subgraph 正确实现
        direction LR
        H[开始]:::start_end --> I{保存中断状态}:::correct_step
        I --> J{获取锁}:::correct_cond
        J --> K[临界区]:::correct_step
        K --> L{释放锁}:::correct_cond
        L --> M{恢复中断状态}:::correct_step
        M --> N[结束]:::start_end
    end

    subgraph 错误实现
        direction LR
        A[开始]:::start_end --> B{关中断}:::error_step
        B --> C{获取锁}:::error_cond
        C --> D[临界区]:::error_step
        D --> E{释放锁}:::error_cond
        E --> F{开中断}:::error_step
        F --> G[结束]:::start_end
    end

    subgraph 场景说明
        direction LR
        O[进程 1: 中断已关闭]:::highlight --> P{尝试获取锁}:::warning
        P --> Q[incorrect_unlock 后, 中断错误开启]:::highlight
    end

    style H stroke:#333,stroke-width:2px
    style N stroke:#333,stroke-width:2px
    style G stroke:#333,stroke-width:2px
    linkStyle 0 stroke-width:2px

xv6 的实现

xv6 操作系统提供了一个自旋锁的实现,很好地处理了中断问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
typedef struct {
const char *name;
int status;
struct cpu *cpu;
} spinlock_t;

void spin_lock(spinlock_t *lk) {
// 关闭中断,避免死锁
push_off();

// 检查是否已经持有锁,如果是则 panic
if (holding(lk)) {
panic("acquire %s", lk->name);
}

// 获取锁
int got;
do {
got = atomic_xchg(&lk->status, LOCKED);
} while (got != UNLOCKED);

// 记录持有锁的 CPU
lk->cpu = mycpu;
}

void spin_unlock(spinlock_t *lk) {
if (!holding(lk)) {
panic("release %s", lk->name);
}

lk->cpu = NULL;
atomic_xchg(&lk->status, UNLOCKED);

// 恢复中断状态
pop_off();
}

其中,push_off()pop_off() 函数用于保存和恢复中断状态。它们可以嵌套调用,push_off() 记录中断关闭的次数,pop_off() 记录想要打开中断的次数,只有当想要打开中断的次数等于中断关闭的次数时,才会真正打开中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 关闭中断,并记录关闭前的状态
void push_off(void) {
int old = ienabled();
struct cpu *c = mycpu;

iset(false); // 关闭中断
if (c->noff == 0) {
c->intena = old;
}
c->noff += 1;
}

// 尝试打开中断,只有当嵌套计数为 0 时才真正打开
void pop_off(void) {
struct cpu *c = mycpu;
if (ienabled()) {
panic("pop_off - interruptible");
}
if (c->noff < 1) {
panic("pop_off");
}
c->noff -= 1;
if (c->noff == 0 && c->intena) {
iset(true); // 打开中断
}
}

用户态互斥锁的问题

在用户态程序中,自旋锁同样存在性能问题。如果临界区执行时间较长,或者持有锁的线程被调度出去,其他线程会浪费大量的 CPU 时间进行自旋。

内核态的临界区一般为短时间操作,因此自旋锁的性能问题不太明显。但在用户态,临界区可能包含长时间操作,同时用户态程序无法通过中断关闭来避免更严重的资源浪费。

sequenceDiagram
    participant ThreadA as 线程 A
    participant Lock as 自旋锁
    participant ThreadB as 线程 B
    participant ThreadC as 线程 C

    ThreadA->>Lock: 尝试获取锁
    Lock-->>ThreadA: 锁可用,获取成功
    ThreadA->>ThreadA: 进入临界区
    par 自旋等待
        ThreadB->>Lock: 尝试获取锁
        Lock-->>ThreadB: 锁不可用,开始空转
        ThreadC->>Lock: 尝试获取锁
        Lock-->>ThreadC: 锁不可用,开始空转
    end
    Note over ThreadB,ThreadC: 不断检查锁状态<br>(空转浪费 CPU)
    ThreadA->>ThreadA: 执行长时间任务或被调度出
    ThreadA->>Lock: 释放锁
    Lock-->>ThreadB: 锁可用,获取成功
    ThreadB->>ThreadB: 进入临界区

yield

一种简单的解决方案是使用 sched_yield() 系统调用,主动让出 CPU。

1
2
3
4
5
6
7
8
9
void yield_lock(spinlock_t *lk) {
while (xchg(&lk->locked, 1)) {
syscall(SYS_yield); // 让出 CPU
}
}

void yield_unlock(spinlock_t *lk) {
xchg(&lk->locked, 0);
}

但是,yield 只是暂时让出 CPU,该线程仍然处于就绪状态,随时可能被再次调度。在获得锁之前,反复的「被调度 > 让出 CPU」会带来大量不必要的上下文切换。

与操作系统调度器配合

更好的解决方案是,用户态的锁操作与操作系统调度器配合:

  • mutex_lock(&lk):尝试获取锁 lk。如果失败(lk 已被持有),利用系统调用阻塞当前线程,将其加入等待锁的队列,并让出 CPU。如果成功,则进入临界区。
  • mutex_unlock(&lk):释放锁。如果等待队列中有线程,则利用系统调用唤醒其中一个线程,将其从等待队列中移除,并加入就绪队列。

这样,等待锁的线程不会占用 CPU 资源,只有当锁可用时才会被唤醒。

于是操作系统需要对一个锁维护一个等待队列。

Linux Futex

然而实现没那么简单,由于操作并非原子的,唤醒信号与等待操作的时序错位,即当发送方(如生产者)在接收方(如消费者)进入等待状态之前发出唤醒信号,信号将因无等待者而被丢弃,后续接收方进入等待时可能永久阻塞,这被称为「唤醒丢失」(wakeup lost)。

Linux 提供了 futex(Fast Userspace Mutexes)系统调用,用于支持用户态的互斥锁。

  • futex_wait(int *address, int expected):首先原子地检查 address 指向的值是否等于 expected。如果相等,则阻塞当前线程;否则,立即返回。
  • futex_wake(int *address):唤醒一个等待 address 指向的锁的线程。

基于这两个系统调用,可以实现如下互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#define UNLOCK 0    // 锁空闲
#define ONE_HOLD 1 // 锁被单个线程持有
#define WAITERS 2 // 锁被持有,且有线程在等待

void mutex_lock(spinlock_t* lk) {
// 尝试获取锁
int c = cmpxchg(lk, UNLOCK, ONE_HOLD); // 如果 lk 的值是 UNLOCK,则将其设为 ONE_HOLD,返回原来的值
if (c != UNLOCK) {
// 锁已被持有
do {
if (c == WAITERS || cmpxchg(lk, ONE_HOLD, WAITERS) != UNLOCK) {
// 等待锁
futex_wait(lk, WAITERS);
}
} while ((c = cmpxchg(lk, UNLOCK, WAITERS)) != UNLOCK);
}
}

void mutex_unlock(spinlock_t* lk) {
// 减少等待计数
if (atomic_dec(lk) != ONE_HOLD) {
// 有等待者
*lk = UNLOCK;
futex_wake(lk);
}
}

这段代码比较复杂,其正确性在于:

  • mutex_lock 函数只会在成功锁住 lk 后才会返回。
    • 获得 lk 的线程本身会将 lk 原子的设为 ONE_HOLD,而其他等待线程将会将 lk 原子的设为 WAITERS,这使得除获得锁外的其他线程不可以在临界区,只有获得锁住线程在退出临界区调用 mutex_unlock 才会将 lk 设为 UNLOCK:这保证了上锁的正确性。
  • 等待的线程能够被唤醒,因为只要有等待的线程在 wait,那么此时的 lk 一定为 WAITERS。而 unlock 就一定会将某个等待的线程唤醒。
  • 由于 unlock 函数在 wake_up 之前先将 lk 设为 UNLOCK,因此上述即使发生了上述丢失 wakeup 的调度,由于 futex_wait 的特性,那个线程在调用 futex_wait 时会发现 lk 的值变了,因此不会休眠。

mutex_lock 函数的实现:

  1. 尝试快速获取锁
    int c = cmpxchg(lk, UNLOCK, ONE_HOLD);
    • 目的:原子交换操作,尝试将锁从 UNLOCK 转为 ONE_HOLD
    • 成功:若 c == UNLOCK,当前线程获得锁,直接返回。
    • 失败:锁已被其他线程持有,进入等待逻辑。
  2. 等待锁释放
    1
    2
    3
    4
    5
    do {
    if (c == WAITERS || cmpxchg(lk, ONE_HOLD, WAITERS) != UNLOCK) {
    futex_wait(lk, WAITERS);
    }
    } while ((c = cmpxchg(lk, UNLOCK, WAITERS)) != UNLOCK);
    • 状态升级:尝试将锁状态从 ONE_HOLD 升级为 WAITERS(标记有等待者)。
    • 休眠:通过 futex_wait 进入休眠,仅在锁状态为 WAITERS 时阻塞。
    • 唤醒后重试:被唤醒后再次尝试通过 cmpxchg 获取锁,确保只有一个等待线程成功。
      • 避免多个线程同时获取锁:多个被唤醒的线程可能同时尝试获取锁,通过 cmpxchg 保证只有一个线程成功。
      • 锁状态保持在 WAITERS,后续线程在尝试获取锁时,会直接进入休眠队列,而非自旋竞争。
      • 防御唤醒丢失。
        mutex_unlock 函数的实现:
  3. 原子释放锁
    1
    2
    3
    4
    if (atomic_dec(lk) != ONE_HOLD) {
    *lk = UNLOCK;
    futex_wake(lk);
    }
    • 原子递减atomic_dec 将锁状态减 1,返回操作前的旧值。
    • 判断等待者
      • 若旧值为 ONE_HOLD,表示无等待者,直接释放锁。
      • 若旧值为 WAITERS,表示有等待者,需唤醒。
  4. 安全唤醒
    • 设置 UNLOCK:先将锁状态设为 UNLOCK,避免唤醒后立即竞争。
    • 唤醒线程:通过 futex_wake 唤醒一个等待线程。

正确性保障:

  1. 互斥性
    • 唯一持有者:只有成功执行 cmpxchg(lk, UNLOCK, ...) 的线程能进入临界区。
    • 状态屏障WAITERS 状态阻止新线程通过自旋获取锁,强制进入休眠队列。
  2. 避免唤醒丢失
    • futex_wait 的健壮性futex_wait 会检查 lk 的当前值是否匹配预期值(WAITERS)。若状态已变(例如被其他线程提前唤醒),则立即返回,避免永久阻塞。
    • 原子操作的顺序性unlock 中先设置 UNLOCK 再唤醒,确保被唤醒线程能看到最新状态。

两阶段锁

Linux 的 futex 实际上是一种两阶段锁(two-phase lock):

  1. Fast Path:首先尝试自旋一次。如果成功获取锁,则直接进入临界区。
  2. Slow Path:如果自旋失败,则根据情况使用 futex_wait 阻塞自己。

这种设计结合了自旋锁和阻塞锁的优点:

  • 当锁竞争不激烈时,使用自旋锁可以避免系统调用的开销。
  • 当锁竞争激烈时,使用阻塞锁可以避免 CPU 资源的浪费。

POSIX 线程锁

POSIX 线程库(pthread)提供了一套标准的线程同步 API,包括互斥锁。

1
2
3
4
5
6
7
8
9
#include <pthread.h>

pthread_mutex_t mutex;

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_lock 实现了上述需求(很多线程的争抢下依然能保持很好的性能),使用方法和自旋锁一致,大部分情况下(在有操作系统的情况)使用它即可。

并发数据结构

线程安全(thread-safe)的数据结构是指可以被多个线程并发访问的数据结构,也称为并发数据结构(concurrent data structure)。

要实现线程安全的数据结构,通常需要在访问和更新数据结构时使用锁(一把或多把)。

并发计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typedef struct {
int value;
pthread_mutex_t lock;
} counter_t;

void init(counter_t *c) {
c->value = 0;
pthread_mutex_init(&c->lock, NULL);
}

void increment(counter_t *c) {
pthread_mutex_lock(&c->lock);
c->value++;
pthread_mutex_unlock(&c->lock);
}

void decrement(counter_t *c) {
pthread_mutex_lock(&c->lock);
c->value--;
pthread_mutex_unlock(&c->lock);
}

int get(counter_t *c) {
pthread_mutex_lock(&c->lock);
int rc = c->value;
pthread_mutex_unlock(&c->lock);
return rc;
}

这个计数器使用了一个互斥锁来保护对 value 的访问,因此是线程安全的。

提高并发性能

上述并发计数器的实现使用了单一的锁,限制了并发性能。为了提高性能,可以采用以下方法:

  • 每个 CPU 维护一个本地计数器:所有 CPU 共享一个全局计数器。
  • 本地计数器使用本地锁保护:全局计数器使用全局锁保护。
  • 本地计数器定期更新全局计数器:而不是每次都更新。

这样,大多数情况下只需要更新本地计数器,减少了对全局锁的竞争,提高了并发度。