Skip to main content

第 1 章 Rust 并发基础(Basics of Rust Concurrency)

早在多核处理器尚未普及之前,操作系统就已经允许一台计算机同时运行多个程序。这是通过在进程之间快速切换来实现的,使每个进程依次获得一点点执行进度。如今,几乎所有的计算机,甚至手机和手表,都配备了多核处理器,能够真正地并行执行多个进程。

操作系统会尽可能地将各个进程彼此隔离,使得一个程序在运行时可以专注于自己的工作,而完全无需知道其他进程在做什么。例如,一个进程通常不能直接访问另一个进程的内存,也不能以任何方式与其通信,除非先请求操作系统内核的帮助。

不过,一个程序可以在同一进程内创建额外的执行线程。处于同一进程中的线程并不会彼此隔离,它们共享内存,并且可以通过这块共享内存相互交互。

本章将介绍如何在 Rust 中创建线程,以及与线程相关的所有基础概念,例如如何在多个线程之间安全地共享数据。本章中解释的概念是全书其余内容的基础。

如果你已经熟悉 Rust 的这些部分,可以直接跳到后面的章节。不过,在继续阅读下一章之前,请务必确保你已经很好地理解了以下内容:线程、内部可变性(interior mutability)、SendSync,以及什么是互斥锁(mutex)、条件变量(condition variable)和线程挂起(thread parking)。

Rust 中的线程 (Threads in Rust)

每个程序都恰好从一个线程开始:主线程(main thread)。这个线程会执行你的 main 函数,并且在需要时可以用来创建更多的线程。

在 Rust 中,新线程是通过标准库里的 std::thread::spawn 函数创建的。它只接受一个参数:新线程要执行的函数。当这个函数返回时,线程也就随之结束。

我们来看一个示例:

use std::thread;

fn main() {
thread::spawn(f);
thread::spawn(f);
println!("Hello from the main thread.");
}

fn f() {
println!("Hello from another thread!");
let id = thread::current().id();
println!("This is my thread id: {id:?}");
}

这里我们创建了两个线程,它们都会以 f 作为各自的主函数来执行。这两个线程都会打印一条消息并显示它们的线程 ID,而主线程也会打印一条自己的消息。

线程 ID

Rust 标准库会为每个线程分配一个唯一的标识符。这个标识符可以通过 Thread::id() 获取,其类型是 ThreadId。你几乎不能对 ThreadId 做太多事情,除了复制它以及进行相等性比较。标准库并不保证这些 ID 是连续分配的,只保证每个线程的 ID 都彼此不同。

如果你多运行几次上面的示例程序,可能会注意到输出在不同运行之间会有所差异。下面是我在某一次运行时,在自己机器上得到的输出结果:

Hello from the main thread.
Hello from another thread!
This is my thread id:

令人意外的是,输出内容似乎缺失了一部分。

刚才发生的情况是:主线程在新创建的线程还没有执行完各自的函数之前,就已经执行完了 main 函数。 一旦从 main 返回,整个程序就会退出,即使此时仍然有其他线程在运行。

在这个具体的示例中,其中一个新创建的线程只来得及把第二条消息打印到一半,程序就被主线程终止了。

如果我们希望在从 main 返回之前确保所有线程都已经执行完成,就需要等待它们结束,也就是对它们进行 join。为此,我们必须使用 spawn 函数返回的 JoinHandle

fn main() {
let t1 = thread::spawn(f);
let t2 = thread::spawn(f);
println!("Hello from the main thread.");
t1.join().unwrap();
t2.join().unwrap();
}

.join() 方法会一直等待,直到对应的线程执行完毕,并返回一个 std::thread::Result。 如果线程因为发生 panic 而没有成功完成函数执行,这个结果中就会包含 panic 的信息。我们可以尝试对这种情况进行处理,或者像这里一样直接调用 .unwrap(),在尝试 join 一个发生 panic 的线程时让当前线程也 panic。

运行这个版本的程序后,就不会再出现输出被截断的情况了:

Hello from the main thread.
Hello from another thread!
This is my thread id: ThreadId(3)
Hello from another thread!
This is my thread id: ThreadId(2)

唯一仍然会在多次运行之间发生变化的,是这些消息被打印出来的顺序,例如:

Hello from the main thread.
Hello from another thread!
Hello from another thread!
This is my thread id: ThreadId(2)
This is my thread id: ThreadId(3)

输出锁定

println! 宏会使用 std::io::Stdout::lock() 来确保输出过程不会被打断。在真正写入任何内容之前,一个 println!() 表达式会等待所有正在并发执行的 println!() 完成。 如果不是这样,我们就可能看到更加交错、难以阅读的输出,例如:

Hello fromHello from another thread!

another This is my threthreadHello fromthread id: ThreadId!

( the main thread.

2)This is my thread

id: ThreadId(3)

相比前面示例中直接把函数名传给 std::thread::spawn,更常见的做法是向它传入一个闭包。这样我们就可以捕获一些值,并将它们移动到新线程中:

let numbers = vec![1, 2, 3];
thread::spawn(move || {
for n in numbers {
println!("{n}");
}
}).join().unwrap();

在这里,由于使用了 move 闭包,numbers 的所有权被转移给了新创建的线程。如果没有使用 move 关键字,闭包就会通过引用来捕获 numbers。这会导致编译错误,因为新线程的生命周期可能会长于该变量本身。

由于一个线程可能会一直运行到程序执行的最后,spawn 函数对其参数类型施加了 'static 生命周期约束。换句话说,它只接受那些可以被长期保存的函数或闭包。而一个通过引用捕获局部变量的闭包,就无法满足这一要求——一旦该局部变量离开作用域,这个引用立刻就会失效。

要从线程中取回一个值,可以通过在闭包中返回该值来实现。这个返回值可以从 join 方法返回的 Result 中获取:

let numbers = Vec::from_iter(0..=1000);
let t = thread::spawn(move || {
let len = numbers.len();
let sum = numbers.into_iter().sum::<usize>();
sum / len
});
let average = t.join().unwrap();
println!("average: {average}");

在这里,线程闭包返回的值会通过 join 方法传回主线程。 如果 numbers 为空,那么线程在尝试做除零运算时就会发生 panic,而 join 会返回这个 panic 信息;由于我们对其调用了 unwrap(),主线程也会因此发生 panic。

线程构建器(Thread Builder)

std::thread::spawn 函数实际上只是std::thread::Builder::new().spawn().unwrap() 这一写法的一个便捷封装。

std::thread::Builder 允许你在创建线程之前对其进行一些配置。你可以用它来设置新线程的栈大小,也可以为新线程指定一个名字。 线程的名字可以通过 std::thread::current().name() 获取,它会出现在 panic 信息中,并且在大多数平台上的监控和调试工具里都是可见的。

此外,Builderspawn 方法会返回一个 std::io::Result,从而允许你显式地处理线程创建失败的情况。这种失败可能发生在操作系统内存不足,或者你的程序被施加了资源限制时。 相比之下,std::thread::spawn 在无法创建新线程时会直接 panic。

作用域线程(Scoped Threads)

如果我们能够确定,一个被创建的线程一定不会比某个作用域活得更久,那么这个线程就可以安全地借用一些并非“永久存在”的东西,比如局部变量——只要这些被借用的值至少能活到该作用域结束为止。

Rust 标准库提供了 std::thread::scope 函数,用来创建这种受作用域限制的线程。它允许我们创建一些线程,并保证这些线程绝不会活过传入该函数的闭包作用域,从而使安全地借用局部变量成为可能。

它的工作方式通过一个示例最容易理解:

let numbers = vec![1, 2, 3];

thread::scope(|s| { //(1)
s.spawn(|| { //(2)
println!("length: {}", numbers.len());
});
s.spawn(|| { //(2)
for n in &numbers {
println!("{n}");
}
});
}); //(3)

(1) 我们调用 std::thread::scope,并向它传入一个闭包。这个闭包会被立即执行,并且接收一个参数 s,表示当前的作用域。

(2) 我们使用 s 来创建线程。这里传入的闭包可以直接借用像 numbers 这样的局部变量。

(3) 当作用域结束时,所有尚未被 join 的线程都会被自动 join

这种模式保证了:在该作用域内创建的所有线程,都不可能比这个作用域活得更久。正因为如此,这种作用域内的 spawn 方法不要求其参数类型满足 'static 约束,从而允许我们引用任何生命周期至少覆盖该作用域的值,例如这里的 numbers

在上面的示例中,两个新线程都在并发地访问 numbers。这完全没有问题,因为它们(以及主线程)都没有对其进行修改。但如果我们把第一个线程改成去修改 numbers,如下所示,编译器就不会再允许我们创建另一个同样使用 numbers 的线程:

let mut numbers = vec![1, 2, 3];

thread::scope(|s| {
s.spawn(|| {
numbers.push(1);
});
s.spawn(|| {
numbers.push(2); // 错误!
});
});

具体的错误信息会随着 Rust 编译器版本的不同而有所变化(因为编译器一直在改进诊断信息),但尝试编译上述代码时,通常会得到类似下面的错误:

error[E0499]: cannot borrow `numbers` as mutable more than once at a time
--> example.rs:7:13
|
4 | s.spawn(|| {
| -- 第一次可变借用发生在这里
5 | numbers.push(1);
| ------- 第一次借用是由于在闭包中使用了 `numbers`
|
7 | s.spawn(|| {
| ^^ 第二次可变借用发生在这里
8 | numbers.push(2);
| ------- 第二次借用是由于在闭包中使用了 `numbers`

Leakpocalypse(“泄漏末日”)

在 Rust 1.0 之前,标准库中曾经有一个名为 std::thread::scoped 的函数,它的行为和 std::thread::spawn 类似,都会直接创建一个线程。 不同之处在于,它允许捕获非 'static 的值,因为它返回的不是 JoinHandle,而是一个 JoinGuard:当这个 JoinGuard 被丢弃(drop)时,线程就会被自动 join。因此,任何被借用的数据只需要活到这个 JoinGuard 被丢弃为止即可。只要 JoinGuard 最终一定会被丢弃,这看起来似乎是安全的。

然而,在 Rust 1.0 发布前不久,人们逐渐意识到:根本无法保证某个值一定会被丢弃。 存在很多方式——例如构造一个由引用计数节点组成的环——可以让某个值被“遗忘”或泄漏,而不会触发它的 drop

最终,在后来被一些人称为 “Leakpocalypse” 的事件中,大家得出了一个结论:一个(安全的)接口设计,不能依赖“对象最终一定会被 drop”这一假设。 泄漏一个对象,合理的后果可以是连带泄漏更多对象(例如泄漏一个 Vec 也会泄漏其中的元素),但绝不能导致未定义行为(undefined behavior)。

基于这一结论,std::thread::scoped 被认为是不安全的,并最终从标准库中移除。同时,std::mem::forget 也从一个 unsafe 函数升级为安全函数,用以强调这样一个事实:遗忘(或泄漏)某个值始终是可能发生的事情

直到很久之后,在 Rust 1.63 中,一个采用全新设计的 std::thread::scoped(也就是如今的 std::thread::scope)才被重新引入标准库,而这一次的设计不再依赖 Drop 来保证正确性。

共享所有权与引用计数(Shared Ownership and Reference Counting)

到目前为止,我们已经看过两种在线程间处理数据的方式:

  1. 使用 move 闭包将值的所有权转移给线程(参见“Rust 中的线程”一节);
  2. 从生命周期更长的父线程借用数据(参见“作用域线程”一节)。

但是,如果要在两个线程之间共享数据,而无法保证其中任何一个线程会比另一个线程活得更久,那么它们都不能成为数据的所有者。这种情况下,共享的数据必须活得足够长——至少比所有线程中存活时间最长的那个线程还要久。


静态变量(Statics)

创建不属于单一线程的最简单方法,是使用 静态值static)。静态值属于整个程序,而不是某个线程。 例如:

static X: [i32; 3] = [1, 2, 3];
thread::spawn(|| dbg!(&X));
thread::spawn(|| dbg!(&X));

静态变量有一个常量初始化器,永远不会被丢弃,并且在程序 main 函数执行之前就已经存在。每个线程都可以借用它,因为它保证总是存在


内存泄漏(Leaking)

另一种共享所有权的方式是通过“泄漏”一个分配。使用 Box::leak,我们可以释放一个 Box 的所有权,并承诺永远不去 drop 它。从那一刻起,这个 Box 会永远存在,没有所有者,允许任何线程在程序运行期间随意借用它:

let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3]));
thread::spawn(move || dbg!(x));
thread::spawn(move || dbg!(x));

虽然闭包使用了 move,看起来像是把所有权移动到了线程中,但仔细看 x 的类型,会发现我们只是给线程提供了对数据的引用。

引用类型是 Copy 的,也就是说即使“移动”它们,原始引用依然存在,就像整数或布尔值一样。

注意 'static 生命周期并不意味着这个值从程序开始就存在,而只是意味着它会一直活到程序结束。过去的存在与否并不重要。

缺点:泄漏 Box 会造成内存泄漏。我们分配了内存,但永远不释放它。如果这种情况发生次数有限,还可以接受;但如果持续这样做,程序会慢慢耗尽内存。


引用计数(Reference Counting)

为了确保共享的数据能在不再使用时被正确释放,我们不能完全放弃它的所有权,而是可以共享所有权。通过记录有多少个所有者,可以保证只有在所有者数量为零时才释放值。

Rust 标准库通过 std::rc::Rc(Reference Counted)提供了这种功能。它和 Box 很相似,但克隆它不会创建新的分配,而是增加存储在值旁边的计数器。原始的 Rc 与克隆的 Rc 都指向同一块内存,共享所有权:

use std::rc::Rc;

let a = Rc::new([1, 2, 3]);
let b = a.clone();
assert_eq!(a.as_ptr(), b.as_ptr()); // 指向同一块内存!

当一个 Rc 被丢弃时,计数器会减少。只有最后一个 Rc(计数器变为零时)会释放并回收内存。

线程安全问题:如果尝试把 Rc 发送到另一个线程,会报错:

error[E0277]: `Rc` cannot be sent between threads safely
8 | thread::spawn(move || dbg!(b));

Rc 并非线程安全。如果多个线程同时持有同一块 Rc,它们可能同时修改引用计数器,导致不可预测的结果。

原子引用计数(Arc)

为了在线程间安全共享引用计数,需要使用 std::sync::Arc(Atomically Reference Counted)。它与 Rc 完全相同,但保证对计数器的修改是原子操作,从而线程安全:

use std::sync::Arc;

let a = Arc::new([1, 2, 3]);
let b = a.clone();

thread::spawn(move || dbg!(a));
thread::spawn(move || dbg!(b));
  • 我们把数组和计数器放在一起,初始计数为 1。
  • 克隆 Arc 会把计数器增加到 2,并得到指向同一内存的第二个 Arc
  • 每个线程都得到自己的 Arc,在丢弃时会递减计数器。
  • 最后一个丢弃的线程会把计数器减到 0,并释放数组。

克隆命名(Naming Clones)

如果必须为每个 Arc 的克隆赋予不同的名字,代码很快就会变得杂乱且难以阅读。虽然每个克隆都是独立的对象,但它们代表的是同一个共享值,用不同名字并不能很好地反映这一点。 Rust 允许(并鼓励)通过**变量遮蔽(shadowing)**来复用同一个名字:可以在相同作用域内>定义一个同名的新变量,此时原来的变量就不能再被使用。如果在新的作用域内执行类似 let a = a.clone(); 的语句,就可以在该作用域中重复使用同一个名字,同时保持外部原始变量仍可用。 通过在闭包外包裹一个新的作用域(用 {}),我们可以在将变量移动到闭包之前克隆它,而无需重新命名:

let a = Arc::new([1, 2, 3]);
let b = a.clone();

thread::spawn(move || {
dbg!(b);
});
dbg!(a);

在这个例子中,Arc 的克隆仍然在相同作用域中存在,每个线程都有自己单独的克隆,并且使用>不同名字。 另一种方式是在不同作用域中克隆:

let a = Arc::new([1, 2, 3]);

thread::spawn({
let a = a.clone(); // 新作用域中的克隆,遮蔽原变量
move || { dbg!(a); }
});

dbg!(a); // 外部的原始变量仍然可用

此时,克隆的 Arc 存活在不同作用域中,我们可以在每个线程中使用同样的名字

由于所有权是共享的,引用计数指针(Rc<T>Arc<T>)与共享引用(&T)有相同的限制:它们不允许修改所包含的值,因为该值可能正被其他代码借用。

例如,尝试对 Arc<[i32]> 内的切片进行排序时,编译器会阻止:

error[E0596]: cannot borrow data in an `Arc` as mutable
6 | a.sort();
| ^^^^^^^^

这保证了共享数据在多线程或多处借用场景下的安全性。

借用与数据竞争(Borrowing and Data Races)

在 Rust 中,值可以通过两种方式被借用:

不可变借用(Immutable borrowing) 使用 & 对某个值进行借用会得到一个不可变引用。这种引用可以被复制。所有这些引用的副本共享对数据的访问。正如名字所示,编译器通常不允许通过不可变引用去修改数据,因为这可能会影响正在同时借用同一数据的其他代码。

可变借用(Mutable borrowing) 使用 &mut 对某个值进行借用会得到一个可变引用。可变借用保证它是该数据当前唯一的活动借用。这确保了对数据的修改不会影响其他正在访问该数据的代码。

这两种机制共同完全防止了数据竞争:即一个线程在修改数据时,另一个线程同时访问它的情况。数据竞争通常会导致未定义行为(undefined behavior),意味着编译器不需要考虑这些情况,它会假设这种情况永远不会发生。

举个例子,来说明编译器如何利用借用规则做出合理假设:

fn f(a: &i32, b: &mut i32) {
let before = *a;
*b += 1;
let after = *a;
if before != after {
x(); // 永远不会发生
}
}

这里,a 是对整数的不可变引用,我们在修改 b 所指的整数前后分别读取 a 的值。编译器可以合理地假设借用规则和数据竞争规则得到了遵守,这意味着 b 不可能指向与 a 相同的整数。实际上,在 a 借用期间,程序中没有任何代码可以可变借用 a 指向的整数。因此,编译器可以得出结论:*a 不会改变,if 条件永远为假,从而可以完全删除对 x() 的调用作为优化。

除非使用 unsafe 块绕过编译器的安全检查,否则不可能写出违反编译器假设的 Rust 程序。

未定义行为(Undefined Behavior)

像 C、C++ 和 Rust 这样的语言都有一套规则,需要遵守,否则会产生所谓的未定义行为。例如,Rust 的规则之一是:对任何对象,最多只能存在一个可变引用

在 Rust 中,只有使用 unsafe 代码时才可能违反这些规则。 “unsafe”并不意味着代码错误或永远不安全,而是编译器不会为你验证代码是否安全。如果代码确实违反了规则,则称为 不安全(unsound)

编译器可以假设这些规则永远不会被破坏,而无需检查。如果规则被破坏,就会产生未定义行为,这必须尽力避免。如果允许编译器基于不真实的假设进行优化,很容易导致程序其他部分产生错误结论,从而影响整个程序。

举个具体例子,使用切片的 get_unchecked 方法:

let a = [123, 456, 789];
let b = unsafe { a.get_unchecked(index) };

get_unchecked 会返回切片中指定索引的元素,就像 a[index] 一样,但它允许编译器假设索引始终在有效范围内,不进行任何检查。

这意味着,在此代码中,因为 a 的长度是 3,编译器可能假设 index < 3。确保这个假设成立是我们的责任。 如果我们破坏了这个假设,例如使用 index = 3,可能发生的情况包括:

  • 读取内存中 a 后面的任意数据;
  • 程序崩溃;
  • 执行程序中完全无关的代码;
  • 产生各种不可预期的后果。

令人惊讶的是,未定义行为甚至可能“逆向传播”,影响先前的代码。例如,假设在前面有一个 match 语句:

match index {
0 => x(),
1 => y(),
_ => z(index),
}

let a = [123, 456, 789];
let b = unsafe { a.get_unchecked(index) };

由于 unsafe,编译器可以假设 index 只可能是 0、1 或 2,它可能推断 match 的最后一>个分支 _ 只会匹配 2,从而假设 z 只会被调用为 z(2)。这一结论不仅可以优化 match,也可能优化 z 本身,包括丢弃未使用的代码。

如果实际运行时 index = 3,程序可能会尝试执行被优化掉的部分,从而导致完全不可预测的行为,远在我们到达 unsafe 块之前就可能出错。这就是未定义行为可能在程序中前后传播的原因,通常非常难以预料。

使用 unsafe 函数时,一定要仔细阅读文档,确保完全理解其安全要求:即作为调用者,需要>遵守哪些假设才能避免未定义行为。

内部可变性(Interior Mutability)

前一节介绍的借用规则很简单,但在某些情况下也非常受限——尤其是涉及多线程时。遵循这些规则会极大限制线程之间的通信,几乎无法修改多个线程都可访问的数据。

幸运的是,Rust 提供了一条“逃生通道”:内部可变性(interior mutability)。具有内部可变性的类型会对借用规则做出一定的“弯曲”,在某些条件下允许通过“不可变”引用进行修改。

在“引用计数(Reference Counting)”一节中,我们已经看到一个涉及内部可变性的微妙例子:RcArc 会修改引用计数器,即使存在多个克隆使用同一个计数器。

一旦涉及内部可变类型,称引用为“不可变”或“可变”就会显得混淆甚至不准确,因为有些数据可以通过两者修改。更准确的说法是共享(shared)独占(exclusive)

  • 共享引用(&T)可以复制并与其他人共享;
  • 独占引用(&mut T)保证它是该值唯一的独占借用。

对于大多数类型,共享引用不允许修改,但存在例外。本书大部分内容将处理这些例外,因此后续章节将使用更准确的术语。

请注意,内部可变性只是对共享借用规则的放宽,以允许在共享情况下进行修改。它不会改变独占借用的规则。独占借用仍然保证没有其他活跃借用。任何导致同一数据存在多个活跃独占引用的 unsafe 代码都会引发未定义行为,无论是否涉及内部可变性。

让我们来看几种具有内部可变性的类型,以及它们如何在不引发未定义行为的情况下,通过共享引用实现数据修改。

Cell

std::cell::Cell<T> 简单地包装一个 T,但允许通过共享引用修改其内容。为了避免未定义行为,它只允许整体替换值,或者如果 T 实现了 Copy,可以复制出值。此外,它只能在单线程中使用。

示例(类似上一节,但使用 Cell<i32>):

use std::cell::Cell;

fn f(a: &Cell<i32>, b: &Cell<i32>) {
let before = a.get();
b.set(b.get() + 1);
let after = a.get();
if before != after {
x(); // 现在可能发生
}
}

与上一次不同,现在 if 条件可能为真。由于 Cell<i32> 具有内部可变性,编译器不再能假设在存在共享引用时其值不会改变。ab 可能指向同一个值,因此通过 b 的修改可能会影响 a。不过,编译器仍然可以假设没有其他线程同时访问这些 Cell

使用 Cell 的限制并不总是容易处理。由于它不能直接借用其持有的值,我们需要先将值取出(留下一个占位值),修改后再放回,以此来修改其内容:

fn f(v: &Cell<Vec<i32>>) {
let mut v2 = v.take(); // 用空 Vec 替换原内容
v2.push(1);
v.set(v2); // 放回修改后的 Vec
}

RefCell

Cell 不同,std::cell::RefCell<T> 允许借用其内容,但有一定的运行时开销。RefCell<T> 不仅包含 T,还包含计数器,用于追踪当前借用状态。如果试图在已有可变借用时再次借用(或反之),会触发 panic,从而避免未定义行为。RefCell 也只能在单线程中使用。

借用内容:

use std::cell::RefCell;

fn f(v: &RefCell<Vec<i32>>) {
v.borrow_mut().push(1); // 可以直接修改 Vec
}

Mutex 和 RwLock

RwLock(读写锁)是 RefCell 的并发版本。RwLock<T> 持有 T 并跟踪活跃借用,但不同于 RefCell,它不会 panic,而是阻塞当前线程,等待冲突借用消失。通过锁定(locking)内容,可以安全借用而避免数据竞争。

Mutex 类似,但概念上更简单,只允许独占借用。

更多内容见“锁:Mutex 和 RwLock”。

原子类型(Atomics)

原子类型是 Cell 的并发版本,是第 2、3 章的重点。它们避免未定义行为,通过整体复制值,而不允许直接借用内容。

原子类型受尺寸限制,没有通用的 Atomic<T>,只有特定类型如 AtomicU32AtomicPtr<T>。它们常用于协调多个线程访问较大数据。

UnsafeCell

UnsafeCell 是内部可变性的基础构建块。UnsafeCell<T> 包装一个 T,但不提供任何防止未定义行为的约束。get() 方法返回原始指针,只能在 unsafe 块中使用。

大多数情况下,UnsafeCell 并不直接使用,而是被封装在提供安全接口的类型中,如 CellMutex。所有具有内部可变性的类型都是基于 UnsafeCell 构建的。

线程安全:Send 和 Sync (Thread Safety: Send and Sync)

在本章中,我们已经看到一些类型并非线程安全,只能在单线程中使用,例如 RcCell 等。为了避免未定义行为,这种限制需要由编译器理解并检查,这样你就可以安全地使用这些类型,而无需使用 unsafe 块。

Rust 使用两个特殊的 trait 来追踪哪些类型可以安全地在多线程中使用:

Send

如果一个类型可以被发送到另一个线程,它就是 Send 类型。换句话说,如果该类型的值的所有权可以转移到另一个线程,它就是 Send。例如,Arc<i32> 是 Send,而 Rc<i32> 不是。

Sync

如果一个类型可以被多个线程共享,它就是 Sync 类型。换句话说,类型 T 是 Sync 当且仅当对该类型的共享引用 &T 是 Send。例如,i32 是 Sync,而 Cell<i32> 不是。(但 Cell<i32> 是 Send。)

所有原始类型(如 i32boolstr)都是 Send 和 Sync。

Send 和 Sync 都是 自动 trait,这意味着它们会基于字段自动为你的类型实现。如果一个结构体的所有字段都是 Send 和 Sync,那么结构体本身也是 Send 和 Sync。

想要“取消”某个 trait,只需在类型中加入一个不实现该 trait 的字段。为此,特殊类型 std::marker::PhantomData<T> 非常有用。编译器会把它当作 T 处理,但它在运行时实际上不存在,是零大小类型,不占空间。

示例:

use std::marker::PhantomData;

struct X {
handle: i32,
_not_sync: PhantomData<Cell<()>>,
}

在这个例子中,如果 handle 是唯一字段,X 将既是 Send 又是 Sync。但我们加入了一个零大小的 PhantomData<Cell<()>> 字段,它会被视作 Cell<()>。由于 Cell<()> 不是 Sync,X 也不是 Sync,但仍然是 Send,因为它的所有字段都是 Send。

原始指针(*const T*mut T)既不是 Send 也不是 Sync,因为编译器无法判断它们所代表的数据。

实现 Send 或 Sync 与实现其他 trait 相同,只需使用 impl 块即可:

struct X {
p: *mut i32,
}

unsafe impl Send for X {}
unsafe impl Sync for X {}

注意,实现这些 trait 需要 unsafe 关键字,因为编译器无法验证你的实现是否正确。这是你对编译器的承诺,编译器会信任你。

如果尝试将非 Send 类型发送到另一个线程,编译器会阻止你:

fn main() {
let a = Rc::new(123);
thread::spawn(move || { // Error!
dbg!(a);
});
}

这里我们尝试将 Rc<i32> 发送到新线程,但 Rc<i32> 不像 Arc<i32> 那样实现 Send。

编译错误示例:

error[E0277]: `Rc<i32>` cannot be sent between threads safely
--> src/main.rs:3:5
3 | thread::spawn(move || {
| ^^^^^^^^^^^^^ `Rc<i32>` cannot be sent between threads safely
= help: within `[closure]`, the trait `Send` is not implemented for `Rc<i32>`
note: required because it's used within this closure --> src/main.rs:3:19
3 | thread::spawn(move || {|
| ^^^^^^^
note: required by a bound in `spawn`

thread::spawn 要求其参数是 Send,且闭包只有在所有捕获值都是 Send 时才是 Send。如果尝试捕获非 Send 类型,编译器会报错,从而保护我们避免未定义行为。

加锁:Mutex 与 RwLock(Locking: Mutexes and RwLocks)

在线程之间共享(可变)数据时,最常用的工具是互斥锁(mutex),这是 mutual exclusion(互斥)的缩写。互斥锁的作用是通过暂时阻塞同时尝试访问数据的其他线程,来确保某一时刻只有一个线程能独占地访问某份数据。

从概念上讲,互斥锁只有两种状态:已锁定未锁定。当一个线程对一个未锁定的互斥锁加锁时,该互斥锁会被标记为已锁定,线程可以立刻继续执行。如果一个线程尝试对已经被锁定的互斥锁加锁,该操作就会被阻塞。线程会被挂起,直到互斥锁被解锁为止。只有在互斥锁处于锁定状态时才能解锁,并且解锁应当由加锁的同一个线程来完成。如果有其他线程正在等待锁,在解锁时,其中一个等待线程会被唤醒,从而再次尝试获取锁并继续执行。

用互斥锁保护数据,本质上是线程之间的一种约定:所有线程都只在持有互斥锁时才访问该数据。这样就能保证不会有两个线程同时访问同一份数据,从而避免数据竞争。

Rust 中的 Mutex

Rust 标准库通过 std::sync::Mutex<T> 提供了这种功能。它是一个对类型 T 泛型的结构体,其中 T 就是被互斥锁保护的数据类型。通过把数据本身作为互斥锁的一部分,Rust 能够强制数据只能通过互斥锁访问,从而提供一个安全的接口,保证所有线程都遵守约定。

为了确保只有加锁的线程才能解锁,Mutex 并没有 unlock() 方法。相反,lock() 方法会返回一个特殊的类型,叫做 MutexGuard。这个 guard 表示“我们已经持有了锁”的保证。它通过实现 DerefMut trait,表现得就像一个对受保护数据的独占引用,使我们能够独占地访问数据。解锁是通过丢弃(drop)这个 guard 来完成的。当 guard 被丢弃时,我们失去了访问数据的能力,而 guard 的 Drop 实现会自动解锁互斥锁。

下面通过一个示例来看互斥锁的实际使用:

use std::sync::Mutex;

fn main() {
let n = Mutex::new(0);

thread::scope(|s| {
for _ in 0..10 {
s.spawn(|| {
let mut guard = n.lock().unwrap();
for _ in 0..100 {
*guard += 1;
}
});
}
});
assert_eq!(n.into_inner().unwrap(), 1000);
}

这里我们创建了一个 Mutex<i32>,即一个保护整数的互斥锁,并启动了 10 个线程,每个线程将该整数递增 100 次。每个线程都会先锁住互斥锁以获取一个 MutexGuard,然后通过该 guard 访问并修改整数。随后,当 guard 变量离开作用域时,会被隐式丢弃,从而自动解锁。

在线程全部完成后,我们可以通过 into_inner() 安全地移除对整数的保护。into_inner() 会取得互斥锁的所有权,这保证了此时不会再有任何地方持有对该互斥锁的引用,因此也就不再需要加锁。

尽管递增操作是一步一步进行的,但任何线程在观察该整数时,只会看到 100 的倍数,因为线程只能在互斥锁未被占用时才能访问该整数。实际上,由于互斥锁的存在,这一百次递增被合并成了一个不可分割的——原子性的——操作

为了更清楚地看到互斥锁的效果,我们可以让每个线程在解锁之前等待一秒钟:

use std::time::Duration;

fn main() {
let n = Mutex::new(0);

thread::scope(|s| {
for _ in 0..10 {
s.spawn(|| {
let mut guard = n.lock().unwrap();
for _ in 0..100 {
*guard += 1;
}
thread::sleep(Duration::from_secs(1)); // 新增
});
}
});

assert_eq!(n.into_inner().unwrap(), 1000);
}

运行这个程序时,你会发现它大约需要 10 秒 才能完成。虽然每个线程只等待 1 秒,但互斥锁保证了同一时间只能有一个线程进入这段代码。

如果我们在休眠前主动丢弃 guard(也就是提前解锁),就会看到并行执行的效果:

fn main() {
let n = Mutex::new(0);

thread::scope(|s| {
for _ in 0..10 {
s.spawn(|| {
let mut guard = n.lock().unwrap();
for _ in 0..100 {
*guard += 1;
}
drop(guard); // 新增:在休眠前丢弃 guard
thread::sleep(Duration::from_secs(1));
});
}
});

assert_eq!(n.into_inner().unwrap(), 1000);
}

现在,这个程序只需要 约 1 秒 就能完成,因为 10 个线程可以同时进行各自的一秒休眠。这说明了一个非常重要的原则:互斥锁的持有时间应尽可能短。如果锁被持有的时间超过必要范围,任何并行化的好处都会被完全抵消,最终导致所有操作都被串行执行。

锁中毒(Lock Poisoning)

前面示例中多次出现的 unwrap()锁中毒有关。

在 Rust 中,如果某个线程在持有互斥锁时发生 panic,该互斥锁就会被标记为“中毒”。发生这种情况后,互斥锁本身并不会一直处于锁定状态,但之后调用 lock() 会返回一个 Err,以表明该锁已经被中毒。

这是为了防止互斥锁所保护的数据处于不一致状态。在前面的示例中,如果某个线程在递增整数尚未完成 100 次时发生 panic,互斥锁会被解锁,但整数就会停留在一个非 100 倍数的状态,从而可能破坏其他线程的假设。自动标记锁为中毒,迫使使用者显式处理这种情况。

即便互斥锁已经中毒,调用 lock() 仍然会成功加锁。lock() 返回的 Err 中包含 MutexGuard,允许我们在必要时修复不一致的数据。

尽管锁中毒看起来是一个强大的机制,但在实际中,很少有人真的去恢复数据状态。大多数代码要么忽略中毒状态,要么直接使用 unwrap(),在锁被中毒时继续 panic,从而把 panic 传播给所有使用该互斥锁的代码。>

MutexGuard 的生命周期

guard 被隐式丢弃时自动解锁这一特性虽然很方便,但有时也会导致一些微妙的问题。如果我们使用 let 语句为 guard 绑定一个变量名(就像前面的例子那样),通常比较容易判断它何时被丢弃,因为局部变量会在其作用域结束时被销毁。但如果不显式地提前丢弃 guard,就可能无意中让互斥锁保持锁定状态更久,正如前面示例所展示的那样。

也可以不为 guard 绑定变量名,而直接使用它。由于 MutexGuard 的行为类似于对受保护数据的独占引用,我们可以直接使用它。例如,对于一个 Mutex<Vec<i32>>,可以在一条语句中完成加锁、修改和解锁:

list.lock().unwrap().push(1);

在一个完整语句中产生的临时值(例如 lock() 返回的 guard),会在该语句结束时被丢弃。 这看起来很合理,但在涉及 matchif letwhile let 时,常常会踩坑。例如:

if let Some(item) = list.lock().unwrap().pop() {
process_item(item);
}

如果我们的本意是:锁住列表 → 弹出一个元素 → 解锁列表 → 再处理该元素,那么这里其实犯了一个细微但重要的错误。

这个临时 guard 并不会在 pop() 后立即被丢弃,而是要等到整个 if let 语句结束才会被丢弃。这意味着我们在处理 item 时,仍然持有锁。

有些出人意料的是,类似的普通 if 语句却不会有这个问题:

if list.lock().unwrap().pop() == Some(1) {
do_something();
}

在这里,临时 guard 会在进入 if 语句体之前就被丢弃。原因在于普通 if 的条件始终是一个纯布尔值,不能借用任何数据,因此没有理由延长临时值的生命周期。而对于 if let,情况就不同了。如果我们用的是 front() 而不是 pop(),那么 item 会借用列表中的数据,这就必须保持 guard 存在。 由于借用检查器只负责检查借用是否合法,并不会影响值何时或以什么顺序被丢弃,因此即使使用的是 pop()(并不需要借用),也会发生同样的行为。

解决方法是:pop() 操作单独放到一个 let 语句中,这样 guard 会在该语句结束时立即被丢弃:

let item = list.lock().unwrap().pop();
if let Some(item) = item {
process_item(item);
}

读写锁(Reader-Writer Lock)

互斥锁只关注独占访问MutexGuard 始终提供对受保护数据的独占引用(&mut T),即使我们只是想读取数据,用共享引用(&T)就足够了。

读写锁是互斥锁的一个稍复杂的版本,它能够区分独占访问和共享访问,并分别提供支持。 它有三种状态:

  1. 未锁定
  2. 被一个写者锁定(独占访问)
  3. 被任意数量的读者锁定(共享访问)

读写锁通常用于读操作频繁、写操作较少的数据结构。

Rust 标准库通过 std::sync::RwLock<T> 提供读写锁。它与 Mutex 类似,但接口被分成了两部分:不再只有一个 lock() 方法,而是提供了 read()write(),分别用于获取读锁和写锁。它有两种 guard 类型:RwLockReadGuardRwLockWriteGuard。前者只实现了 Deref,表现为对数据的共享引用;后者还实现了 DerefMut,表现为独占引用。

本质上,RwLockRefCell 的多线程版本,在运行时动态追踪借用数量,以确保借用规则不被破坏。

Mutex<T>RwLock<T> 都要求 T 实现 Send,因为它们可以被用来将 T 发送到其他线程。 此外,RwLock<T> 还要求 T 实现 Sync,因为它允许多个线程同时持有对受保护数据的共享引用(&T)。 (严格来说,你可以创建一个不满足这些约束的锁,但由于锁本身不会实现 Sync,因此你无法在多线程之间共享它。)

Rust 标准库只提供了一种通用的 RwLock,但其具体实现依赖于操作系统。不同的读写锁实现之间存在许多细微差别。大多数实现都会在有写者等待时阻止新的读者进入,即使锁当前已经被读者持有。这是为了防止写者饥饿:即大量读者持续占用锁,导致写者永远无法更新数据。

其他语言中的 Mutex

Rust 标准库中的 MutexRwLock 与 C、C++ 等语言中的同类工具看起来有些不同。 最大的区别在于:Rust 的 Mutex<T> 内部直接包含被保护的数据。 例如,在 C++ 中,std::mutex 并不包含它所保护的数据,甚至并不知道自己在保护什么。这意味着,使用者必须自己记住哪些数据由哪个互斥锁保护,并在每次访问“受保护数据”时确保锁已经被正确地加锁。 在阅读其他语言中涉及互斥锁的代码,或与不熟悉 Rust 的程序员交流时,这一点尤其值得注意。 Rust 程序员可能会说“互斥锁里的数据”或“把它包进一个 mutex 里”,这对只熟悉其他语言互斥锁的人来说可能会感到困惑。 如果你确实需要一个不包含任何数据的独立互斥锁(例如用于保护某些外部硬件),可以使用 Mutex<()>。 但即便如此,通常更好的做法是定义一个(可能是零大小的)类型来表示该硬件接口,然后将其包裹在 Mutex 中。这样,Rust 仍然会强制你在与硬件交互之前先获取锁。

等待:线程挂起(Parking)与条件变量(Waiting: Parking and Condition Variables)

当数据被多个线程修改时,常常会出现这样的情况:线程需要等待某个事件发生,或者等待数据满足某个条件。例如,如果我们用一个互斥锁保护一个 Vec,就可能希望在它包含元素之前一直等待。

虽然互斥锁确实允许线程在锁被占用时等待,但它并不提供等待其他条件成立的功能。如果我们只有互斥锁可用,就只能不断地加锁、检查 Vec 里是否已经有元素,再解锁并重复这一过程。

线程挂起(Thread Parking)

一种等待来自其他线程通知的方式叫做线程挂起(thread parking)。线程可以将自己挂起,使其进入休眠状态,不再消耗任何 CPU 时间。随后,另一个线程可以将这个被挂起的线程唤醒。

线程挂起通过 std::thread::park() 函数实现。要唤醒线程,可以对表示该线程的 Thread 对象调用 unpark() 方法。这个对象可以通过 spawn 返回的 JoinHandle 获取,或者在线程内部通过 std::thread::current() 获取。

下面我们来看一个示例,使用互斥锁在两个线程之间共享一个队列。新创建的线程负责从队列中消费元素,而主线程每秒向队列中插入一个新元素。当队列为空时,消费线程会通过线程挂起进入等待状态。

use std::collections::VecDeque;

fn main() {
let queue = Mutex::new(VecDeque::new());

thread::scope(|s| {
// 消费线程
let t = s.spawn(|| loop {
let item = queue.lock().unwrap().pop_front();
if let Some(item) = item {
dbg!(item);
} else {
thread::park();
}
});

// 生产线程
for i in 0.. {
queue.lock().unwrap().push_back(i);
t.thread().unpark();
thread::sleep(Duration::from_secs(1));
}
});
}

消费线程在一个无限循环中运行,从队列中弹出元素并使用 dbg! 宏输出。当队列为空时,它会调用 park() 进入休眠状态。一旦被唤醒,park() 调用返回,循环继续执行,再次从队列中取元素,直到队列再次为空。如此反复。

生产线程则每秒生成一个新的数字,将其压入队列中。每次添加元素后,它都会通过对应消费线程的 Thread 对象调用 unpark(),将消费线程唤醒,使其能够处理新元素。

这里有一个重要的观察点:即使完全移除线程挂起,这个程序在理论上仍然是正确的,只是效率较低。这一点非常关键,因为 park() 并不保证它只会因为某次匹配的 unpark() 而返回。虽然比较少见,但它可能会出现虚假唤醒(spurious wake-up)。我们的示例可以很好地应对这种情况:被唤醒的消费线程会重新加锁队列,发现队列仍然为空,于是立刻解锁并再次挂起自己。

线程挂起还有一个非常重要的性质:在调用 park() 之前发生的 unpark() 调用不会丢失。唤醒请求会被记录下来,下一次线程尝试挂起时,这个请求会被清除,并且线程会立刻继续执行,而不会真的进入休眠状态。为了理解这一点为何对正确性至关重要,我们来看一个可能的执行顺序(假设消费线程为 C,生产线程为 P):

  1. 消费线程 C 锁住队列。
  2. C 尝试从队列中弹出元素,但队列为空,得到 None
  3. C 解锁队列。
  4. 生产线程 P 锁住队列。
  5. P 向队列中压入一个新元素。
  6. P 解锁队列。
  7. P 调用 unpark() 通知 C 有新元素。
  8. C 调用 park() 进入休眠,等待更多元素。

尽管在步骤 3 解锁队列和步骤 8 挂起线程之间,时间窗口通常非常短,但步骤 4 到 7 完全有可能在这个窗口内发生。如果 unpark() 在目标线程尚未挂起时什么都不做,那么这次通知就会丢失,消费线程就会继续等待,即便队列中已经有了元素。

正是因为 unpark() 的请求会被保存并用于未来的 park() 调用,我们才不必担心这种情况。不过,unpark() 请求不会累积。如果连续调用两次 unpark(),然后再连续调用两次 park(),线程仍然会进入休眠。第一次 park() 会清除请求并立即返回,但第二次 park() 会正常进入休眠。

这意味着,在上面的示例中,我们必须只在确认队列为空时才挂起线程,而不是在每次处理完一个元素后都调用 park()。虽然在这个例子中由于有长达一秒的休眠,这种问题极不可能发生,但理论上,多个 unpark() 调用可能只对应唤醒一次 park()

不幸的是,如果 unpark() 恰好在 park() 返回之后、但在队列被重新加锁并清空之前被调用,那么这次 unpark() 虽然没有必要,却仍然会导致下一次 park() 立即返回。结果就是队列被额外地加锁和解锁了一次。这种行为不会影响程序的正确性,但会影响效率和性能。

这种机制在简单场景下(如我们的示例)运行良好,但一旦情况变复杂就会迅速失控。例如,如果有多个消费线程从同一个队列中取数据,生产线程就无法知道究竟哪个消费线程正在等待、也不知道该唤醒哪一个。生产线程必须精确地知道哪些线程在等待,以及它们等待的条件是什么。

条件变量(Condition Variables)

条件变量是一种更常用的、用于等待互斥锁保护数据发生变化的机制。它们有两个基本操作:等待(wait)*和*通知(notify)。线程可以在条件变量上等待,当其他线程对同一个条件变量发出通知时,等待的线程就会被唤醒。多个线程可以同时等待同一个条件变量,通知既可以唤醒其中一个线程,也可以唤醒所有等待的线程。

这意味着我们可以为某个特定事件或条件(例如“队列非空”)创建一个条件变量,并在该条件变量上等待。任何使该条件成立的线程,只需通知该条件变量即可,而不需要知道具体是哪些线程、或者有多少线程在等待这个通知。

为了解决在解锁互斥锁与进入等待状态之间可能错过通知的问题,条件变量提供了一种机制,可以原子地解锁互斥锁并进入等待状态。这意味着根本不存在通知丢失的时间窗口。

Rust 标准库通过 std::sync::Condvar 提供条件变量。它的 wait 方法接收一个 MutexGuard,以证明互斥锁已经被锁定。wait 会先解锁互斥锁,然后让线程进入休眠;当线程被唤醒后,它会重新锁住互斥锁,并返回一个新的 MutexGuard(证明锁再次被持有)。

条件变量提供了两个通知方法:

  • notify_one:唤醒一个等待中的线程(如果有)
  • notify_all:唤醒所有等待中的线程

下面我们将前面使用线程挂起的示例改为使用 Condvar

use std::sync::Condvar;

let queue = Mutex::new(VecDeque::new());
let not_empty = Condvar::new();

thread::scope(|s| {
s.spawn(|| {
loop {
let mut q = queue.lock().unwrap();
let item = loop {
if let Some(item) = q.pop_front() {
break item;
} else {
q = not_empty.wait(q).unwrap();
}
};
drop(q);
dbg!(item);
}
});

for i in 0.. {
queue.lock().unwrap().push_back(i);
not_empty.notify_one();
thread::sleep(Duration::from_secs(1));
}
});

我们做了以下几处修改:

  • 现在不仅有一个包含队列的 Mutex,还新增了一个 Condvar,用于传达“队列非空”这一条件。
  • 不再需要知道具体要唤醒哪个线程,因此也不需要保存 spawn 的返回值。生产者通过 notify_one 通知条件变量即可。
  • 解锁、等待以及重新加锁都由 wait 方法完成。为了把 guard 传递给 wait,并在处理元素前丢弃它,我们对控制流做了一些调整。

现在,我们可以随意创建任意数量的消费线程,甚至在运行过程中再创建新的消费线程,而无需修改任何逻辑。条件变量会负责将通知发送给真正需要它的线程。

如果系统更加复杂,不同线程关心不同的条件,我们可以为每个条件定义一个单独的 Condvar。例如,一个条件变量表示“队列非空”,另一个表示“队列为空”,线程只需等待与自身任务相关的条件即可。

通常,一个 Condvar 只会与一个 Mutex 一起使用。如果两个线程试图使用不同的互斥锁在同一个条件变量上等待,可能会导致 panic。

条件变量的一个缺点是:它只能与 Mutex 一起使用。不过在大多数使用场景中,这并不是问题,因为数据本身通常就已经需要通过互斥锁来保护。

thread::park()Condvar::wait() 都有带超时的版本:thread::park_timeout()Condvar::wait_timeout()。它们额外接收一个 Duration 参数,用于指定在等待多长时间后放弃等待并无条件唤醒。

总结 (Summary)

  • 多个线程可以在同一个程序中并发运行,并且可以在任何时候创建。
  • 当主线程结束时,整个程序也会随之结束。
  • 数据竞争属于未定义行为,而 Rust 的类型系统(在安全代码中)可以完全防止这种情况。
  • 实现了 Send 的数据可以被发送到其他线程;实现了 Sync 的数据可以在线程之间共享。
  • 普通线程可能一直运行到程序结束,因此只能借用 'static 数据(如静态变量或被泄漏的分配)。
  • 引用计数(Arc)可以用于共享所有权,确保只要还有线程在使用,数据就不会被释放。
  • 作用域线程(scoped threads)可以限制线程的生命周期,从而允许借用非 'static 数据,例如局部变量。
  • &T 是共享引用,&mut T 是独占引用。普通类型不允许通过共享引用进行修改。
  • 某些类型通过 UnsafeCell 实现了内部可变性,从而允许通过共享引用修改数据。
  • CellRefCell 是单线程环境下的内部可变性工具;AtomicsMutexRwLock 是它们的多线程对应物。
  • Cell 和原子类型只能整体替换值;而 RefCellMutexRwLock 允许在运行时动态地强制访问规则,从而直接修改内部值。
  • 线程挂起是一种等待条件的便捷方式。
  • 当条件涉及由 Mutex 保护的数据时,使用 Condvar 通常比线程挂起更方便,也更高效。