互斥(进阶)
自旋锁的改进
在上一章中,我们介绍了自旋锁(spinlock),它通过忙等待(busy waiting)的方式来实现互斥。但是,自旋锁存在一些问题:
- 公平性问题:当一个线程在自旋等待时,无法保证它最终能进入临界区。如果有其他线程持续进入临界区,并且调度器总是优先调度它们,那么当前自旋的线程可能会一直等待,违背了有界等待原则。
- 性能问题:自旋锁会让等待的线程持续占用 CPU 资源,进行无意义的循环检查。
为了解决公平性问题,可以引入「排队」机制。
排号自旋锁(Ticket Lock)
排号自旋锁的基本思想是:
- 每个尝试进入临界区的线程获取一个唯一的票号(ticket)
- 维护一个全局的当前服务号(turn)
- 只有当线程持有的票号与当前服务号相同时,才能进入临界区
- 线程退出临界区时,递增服务号
1 | typedef struct { |
这里使用了 GCC 内建的原子操作 __sync_fetch_and_add
,它等价于:
1 | // 将 1 加到 flag.ticket,并返回 flag.ticket 的「旧」值给 myturn |
flag.ticket
和 flag.turn
以及线程数的关系:
flag.ticket
表示下一个可用的票号,其值等于已进入或正在等待进入临界区的线程总数。flag.turn
表示当前持有锁或被允许进入临界区的线程的票号。- 活跃线程数(包括正在临界区内执行的和正在自旋等待的)始终小于等于
flag.ticket - flag.turn
。
GCC 提供了很多内建的原子操作函数,可以编写跨平台的原子操作,而不仅限于 x86 平台。
type __sync_fetch_and_add(type *ptr, type value)
bool __sync_bool_compare_and_swap(type *ptr, type oldval type newval)
type __sync_lock_test_and_set(type *ptr, type value)
void __sync_lock_release(type *ptr)
细粒度锁
在多线程程序中,并非所有线程都需要互斥访问所有共享资源。如果对所有共享资源都使用同一个锁(粗粒度锁),会限制程序的并发性能。
细粒度锁(fine-grained lock)的思想是:将共享资源划分为多个更小的部分,每个部分用一个独立的锁来保护。这样,不同线程可以并发地访问不同的资源部分,提高了并发度。
1 | // 粗粒度锁 |
举个例子:
1 | // 粗粒度锁(全局唯一锁) |
内核中的互斥锁
互斥锁的正确性依赖于正确的使用,lock
与 unlock
必须按照正确的方式才能形成保护:
1 | T1: spin_lock(&lk); sum++; spin_unlock(&lk); |
T3 不正确的访问就将整个并发逻辑破坏,因此正确的使用互斥锁是非常重要的。
在操作系统内核中,互斥锁的使用非常普遍,因为内核需要管理各种共享资源。但是,内核中的互斥锁实现需要考虑中断的影响。
中断与自旋锁
考虑以下场景:
- 一个线程在内核态执行,访问某个共享资源,并持有自旋锁。
- 此时,发生了一个中断,CPU 切换到中断处理程序。
- 中断处理程序也需要访问相同的共享资源,尝试获取自旋锁。
- 由于自旋锁已经被持有,中断处理程序会进入自旋等待。
由于中断处理程序的优先级通常高于普通线程(还高过系统调用),因此中断处理程序会一直占用 CPU,导致持有锁的线程无法继续执行,也就无法释放锁。这就导致了死锁。这种现象称为「优先级反转」(priority inversion)。
graph LR
A[线程] -->|系统调用| B{内核态}
B -->|获取锁| C[(共享资源)]
C -->|中断发生| D[中断处理程序]
D -->|尝试获取锁| C
style A fill:#9ff,stroke:#333,stroke-width:2px
style B fill:#cfc,stroke:#333,stroke-width:2px
style C fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#ccf,stroke:#333,stroke-width:2px
解决方法:关中断
为了避免上述死锁问题,一种常见的做法是在获取自旋锁之前关闭中断,在释放锁之后再打开中断。这样可以确保在持有锁期间不会发生中断,从而避免了竞争条件。
但是,简单地在 lock
前关中断、unlock
后开中断是错误的。如果在获取锁之前中断已经是关闭的,那么在释放锁时打开中断就会破坏之前的状态。
因此,正确的做法是:
- 在获取锁之前,保存当前的中断状态(开或关)。
- 关闭中断。
- 获取锁。
- 释放锁。
- 恢复之前保存的中断状态。
graph LR
classDef start_end fill:#fff,stroke:#333,stroke-width:2px,stroke-dasharray: 0,fill-opacity:1
classDef correct_step fill:#e6f4ea,stroke:#2ecc40,stroke-width:2px
classDef correct_cond fill:#e6f4ea,stroke:#2ecc40,stroke-width:2px,stroke-dasharray: 5 5
classDef error_step fill:#feebe9,stroke:#e74c3c,stroke-width:2px
classDef error_cond fill:#feebe9,stroke:#e74c3c,stroke-width:2px,stroke-dasharray: 5 5
classDef warning fill:#fff8e7,stroke:#f1c40f,stroke-width:2px
classDef highlight fill:#fff,stroke:#3498db,stroke-width:2px
subgraph 正确实现
direction LR
H[开始]:::start_end --> I{保存中断状态}:::correct_step
I --> J{获取锁}:::correct_cond
J --> K[临界区]:::correct_step
K --> L{释放锁}:::correct_cond
L --> M{恢复中断状态}:::correct_step
M --> N[结束]:::start_end
end
subgraph 错误实现
direction LR
A[开始]:::start_end --> B{关中断}:::error_step
B --> C{获取锁}:::error_cond
C --> D[临界区]:::error_step
D --> E{释放锁}:::error_cond
E --> F{开中断}:::error_step
F --> G[结束]:::start_end
end
subgraph 场景说明
direction LR
O[进程 1: 中断已关闭]:::highlight --> P{尝试获取锁}:::warning
P --> Q[incorrect_unlock 后, 中断错误开启]:::highlight
end
style H stroke:#333,stroke-width:2px
style N stroke:#333,stroke-width:2px
style G stroke:#333,stroke-width:2px
linkStyle 0 stroke-width:2px
xv6 的实现
xv6 操作系统提供了一个自旋锁的实现,很好地处理了中断问题。
1 | typedef struct { |
其中,push_off()
和 pop_off()
函数用于保存和恢复中断状态。它们可以嵌套调用,push_off()
记录中断关闭的次数,pop_off()
记录想要打开中断的次数,只有当想要打开中断的次数等于中断关闭的次数时,才会真正打开中断。
1 | // 关闭中断,并记录关闭前的状态 |
用户态互斥锁的问题
在用户态程序中,自旋锁同样存在性能问题。如果临界区执行时间较长,或者持有锁的线程被调度出去,其他线程会浪费大量的 CPU 时间进行自旋。
内核态的临界区一般为短时间操作,因此自旋锁的性能问题不太明显。但在用户态,临界区可能包含长时间操作,同时用户态程序无法通过中断关闭来避免更严重的资源浪费。
sequenceDiagram
participant ThreadA as 线程 A
participant Lock as 自旋锁
participant ThreadB as 线程 B
participant ThreadC as 线程 C
ThreadA->>Lock: 尝试获取锁
Lock-->>ThreadA: 锁可用,获取成功
ThreadA->>ThreadA: 进入临界区
par 自旋等待
ThreadB->>Lock: 尝试获取锁
Lock-->>ThreadB: 锁不可用,开始空转
ThreadC->>Lock: 尝试获取锁
Lock-->>ThreadC: 锁不可用,开始空转
end
Note over ThreadB,ThreadC: 不断检查锁状态<br>(空转浪费 CPU)
ThreadA->>ThreadA: 执行长时间任务或被调度出
ThreadA->>Lock: 释放锁
Lock-->>ThreadB: 锁可用,获取成功
ThreadB->>ThreadB: 进入临界区
yield
一种简单的解决方案是使用 sched_yield()
系统调用,主动让出 CPU。
1 | void yield_lock(spinlock_t *lk) { |
但是,yield
只是暂时让出 CPU,该线程仍然处于就绪状态,随时可能被再次调度。在获得锁之前,反复的「被调度 > 让出 CPU」会带来大量不必要的上下文切换。
与操作系统调度器配合
更好的解决方案是,用户态的锁操作与操作系统调度器配合:
mutex_lock(&lk)
:尝试获取锁lk
。如果失败(lk
已被持有),利用系统调用阻塞当前线程,将其加入等待锁的队列,并让出 CPU。如果成功,则进入临界区。mutex_unlock(&lk)
:释放锁。如果等待队列中有线程,则利用系统调用唤醒其中一个线程,将其从等待队列中移除,并加入就绪队列。
这样,等待锁的线程不会占用 CPU 资源,只有当锁可用时才会被唤醒。
于是操作系统需要对一个锁维护一个等待队列。
Linux Futex
然而实现没那么简单,由于操作并非原子的,唤醒信号与等待操作的时序错位,即当发送方(如生产者)在接收方(如消费者)进入等待状态之前发出唤醒信号,信号将因无等待者而被丢弃,后续接收方进入等待时可能永久阻塞,这被称为「唤醒丢失」(wakeup lost)。
Linux 提供了 futex(Fast Userspace Mutexes)系统调用,用于支持用户态的互斥锁。
futex_wait(int *address, int expected)
:首先原子地检查address
指向的值是否等于expected
。如果相等,则阻塞当前线程;否则,立即返回。futex_wake(int *address)
:唤醒一个等待address
指向的锁的线程。
基于这两个系统调用,可以实现如下互斥锁:
1 |
|
这段代码比较复杂,其正确性在于:
mutex_lock
函数只会在成功锁住lk
后才会返回。- 获得
lk
的线程本身会将lk
原子的设为ONE_HOLD
,而其他等待线程将会将lk
原子的设为WAITERS
,这使得除获得锁外的其他线程不可以在临界区,只有获得锁住线程在退出临界区调用mutex_unlock
才会将lk
设为UNLOCK
:这保证了上锁的正确性。
- 获得
- 等待的线程能够被唤醒,因为只要有等待的线程在
wait
,那么此时的lk
一定为WAITERS
。而unlock
就一定会将某个等待的线程唤醒。 - 由于
unlock
函数在wake_up
之前先将lk
设为UNLOCK
,因此上述即使发生了上述丢失wakeup
的调度,由于futex_wait
的特性,那个线程在调用futex_wait
时会发现lk
的值变了,因此不会休眠。
mutex_lock
函数的实现:
- 尝试快速获取锁
int c = cmpxchg(lk, UNLOCK, ONE_HOLD);
- 目的:原子交换操作,尝试将锁从
UNLOCK
转为ONE_HOLD
。 - 成功:若
c == UNLOCK
,当前线程获得锁,直接返回。 - 失败:锁已被其他线程持有,进入等待逻辑。
- 目的:原子交换操作,尝试将锁从
- 等待锁释放
1
2
3
4
5do {
if (c == WAITERS || cmpxchg(lk, ONE_HOLD, WAITERS) != UNLOCK) {
futex_wait(lk, WAITERS);
}
} while ((c = cmpxchg(lk, UNLOCK, WAITERS)) != UNLOCK);- 状态升级:尝试将锁状态从
ONE_HOLD
升级为WAITERS
(标记有等待者)。 - 休眠:通过
futex_wait
进入休眠,仅在锁状态为WAITERS
时阻塞。 - 唤醒后重试:被唤醒后再次尝试通过
cmpxchg
获取锁,确保只有一个等待线程成功。- 避免多个线程同时获取锁:多个被唤醒的线程可能同时尝试获取锁,通过
cmpxchg
保证只有一个线程成功。 - 锁状态保持在
WAITERS
,后续线程在尝试获取锁时,会直接进入休眠队列,而非自旋竞争。 - 防御唤醒丢失。
mutex_unlock
函数的实现:
- 避免多个线程同时获取锁:多个被唤醒的线程可能同时尝试获取锁,通过
- 状态升级:尝试将锁状态从
- 原子释放锁
1
2
3
4if (atomic_dec(lk) != ONE_HOLD) {
*lk = UNLOCK;
futex_wake(lk);
}- 原子递减:
atomic_dec
将锁状态减 1,返回操作前的旧值。 - 判断等待者:
- 若旧值为
ONE_HOLD
,表示无等待者,直接释放锁。 - 若旧值为
WAITERS
,表示有等待者,需唤醒。
- 若旧值为
- 原子递减:
- 安全唤醒
- 设置
UNLOCK
:先将锁状态设为UNLOCK
,避免唤醒后立即竞争。 - 唤醒线程:通过
futex_wake
唤醒一个等待线程。
- 设置
正确性保障:
- 互斥性
- 唯一持有者:只有成功执行
cmpxchg(lk, UNLOCK, ...)
的线程能进入临界区。 - 状态屏障:
WAITERS
状态阻止新线程通过自旋获取锁,强制进入休眠队列。
- 唯一持有者:只有成功执行
- 避免唤醒丢失
futex_wait
的健壮性:futex_wait
会检查lk
的当前值是否匹配预期值(WAITERS
)。若状态已变(例如被其他线程提前唤醒),则立即返回,避免永久阻塞。- 原子操作的顺序性:
unlock
中先设置UNLOCK
再唤醒,确保被唤醒线程能看到最新状态。
两阶段锁
Linux 的 futex 实际上是一种两阶段锁(two-phase lock):
- Fast Path:首先尝试自旋一次。如果成功获取锁,则直接进入临界区。
- Slow Path:如果自旋失败,则根据情况使用
futex_wait
阻塞自己。
这种设计结合了自旋锁和阻塞锁的优点:
- 当锁竞争不激烈时,使用自旋锁可以避免系统调用的开销。
- 当锁竞争激烈时,使用阻塞锁可以避免 CPU 资源的浪费。
POSIX 线程锁
POSIX 线程库(pthread)提供了一套标准的线程同步 API,包括互斥锁。
1 |
|
pthread_mutex_lock
实现了上述需求(很多线程的争抢下依然能保持很好的性能),使用方法和自旋锁一致,大部分情况下(在有操作系统的情况)使用它即可。
并发数据结构
线程安全(thread-safe)的数据结构是指可以被多个线程并发访问的数据结构,也称为并发数据结构(concurrent data structure)。
要实现线程安全的数据结构,通常需要在访问和更新数据结构时使用锁(一把或多把)。
并发计数器
1 | typedef struct { |
这个计数器使用了一个互斥锁来保护对 value
的访问,因此是线程安全的。
提高并发性能
上述并发计数器的实现使用了单一的锁,限制了并发性能。为了提高性能,可以采用以下方法:
- 每个 CPU 维护一个本地计数器:所有 CPU 共享一个全局计数器。
- 本地计数器使用本地锁保护:全局计数器使用全局锁保护。
- 本地计数器定期更新全局计数器:而不是每次都更新。
这样,大多数情况下只需要更新本地计数器,减少了对全局锁的竞争,提高了并发度。