放在一起:未来值、任务与线程

正如我们在 第 16 章 中所看到的,线程提供了一种并发的方法。我们在本章中看到了另一种方法:使用未来值与流的异步。如果咱们想要知道,何时该选择另一种方法,答案是:视情况而定!在很多情况下,我们需要选择的不是线程 异步,而是线程 异步。

数十年来,许多操作系统都提供了基于线程的并发模型,而许多编程语言也因此支持这些模型。不过,这些模型也并非没有代价。在许多操作系统上,每个线程都会占用相当多的内存,而且启动和关闭线程都会产生一些开销。也只有在操作系统和硬件支持的情况下,线程才可用。与主流台式机和便携电脑不同,一些嵌入式系统根本没有操作系统,因此他们也没有线程。

异步模型提供了一套不同的权衡机制,而成为一种终极补充。在异步模型中,并发操作不需要其各自的线程。相反,他们可运行于任务之上,就像我们在流小节中,使用 trpl::spawn_task 启动某个同步函数的工作一样。任务类似于线程,但他不是由操作系统管理,而是由库级别的代码(即运行时)管理。

在上一小节中,我们看到了可通过使用一个异步通道,及生成一个我们可从同步代码中调用的异步任务,而构建出一个流。我们也可使用线程,完成这完全一样的事情。在下面清单 17-40 中,我们使用标准库中的 trpl::spawn_tasktrpl::sleep 两个 APIs,替换了 get_intervals 中的异步通道与异步任务。

文件名:src/main.rs

#![allow(unused)]
fn main() {
fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // This is *not* `trpl::spawn` but `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
}

清单 17-41:在 get_intervals 中使用 std::thread 而非异步的 trpl APIs

若咱们运行这段代码,输出结果会与清单 17-40 相同。请注意,从调用代码的角度来看,这里的变化微乎其微。更重要的是,尽管我们的一个函数在运行时上生成了个异步任务,而另一函数生成了个操作系统的线程,但得到的两个流,并没有受到这些差异的影响。

尽管这两种方法有相似之处,但他们的行为却大相径庭,尽管我们可能很难在这个非常简单的例子中测量出来。我们可以在任何现代个人电脑上,生成数百万个异步任务。但若我们试图用线程来做这件事,内存真的会用完!

然而,这些 API 如此相似是有原因的。线程充当了一些同步操作集的边界;线程 之间 可以并发。任务则充当了一些异步操作集的边界;任务 之间 和任务 内部 都可以并发,因为任务可以在其主体中的未来值之间切换。最后,未来值是 Rust 最细粒度的并发单元,每个未来值都可以代表一棵由其他未来值组成的树。运行时 -- 具体来说是运行时的执行器 -- 管理着任务,而任务管理着未来值。在这方面,任务类似于由运行时管理着的轻量级线程,同时由于是由运行时而不是操作系统管理,因此任务还是具有更多功能的轻量级线程。

这并不意味着异步任务总是要比线程更好(反之亦然)。在某些方面,相比于使用 async 的并发,使用线程的并发是一种更简单的编程模型。这可以是优点,也可以是缺点。线程在某种程度上是 “触发并遗忘” 的;他们没有与未来值相对应的原生对等体,因此除非被操作系统本身打断,他们运行即可完成。也就是说,线程并不像未来值那样,支持 任务内的并发。Rust 中的线程也没有取消机制 -- 我们在本章中没有明确涉及这一主题,但每当我们结束某个未来值时,其状态就会被正确清理,这一事实暗示了任务的取消机制。

这些限制也使得线程比期货更难于组装。例如,使用线程构建 timeoutthrottle 方法等辅助工具,就比我们在本章前面所构建的要困难得多。正如我们所看到的,未来值是一种更丰富的数据结构,这意味着他们可以更自然地组合在一起。

因此,任务给到我们对未来值的 额外 控制,允许我们选择在何处以及如何对他们分组。事实证明,线程和任务往往能配合得很好,因为任务可以(至少在某些运行时下)在线程间迁移。事实上,我们一直在使用的运行时,包括 spawn_blockingspawn_task 两个函数,默认情况下都是多线程的!许多运行时都使用了一种名为 工作偷取,work stealing 的方法,根据线程当前的使用情况,在线程间透明地迁移任务,以提高系统的整体性能。这种方法实际上需要线程 任务,因此也需要未来值。

在考虑何时使用哪种方法时,请考虑以下经验法则:

  • 如果工作的 并行性很强,比如处理每个部分都可以单独处理的大量数据时,线程是更好的选择;
  • 如果工作的 并发性很高,例如处理来自不同来源,可能以不同时间间隔或不同速度发送的消息时,那么异步是更好的选择。

如果咱们同时需要并行性和并发性,咱们就不必在线程和异步之间做出选择。咱们可自由地将他们结合在一起使用,让他们各自发挥其最擅长的部分。例如,下面清单 17-42 展示了,实际 Rust 代码中这种混合使用的一个相当常见的示例。

文件名:src/main.rs

use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    thread::spawn(move || {
        for i in 1..11 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    trpl::run(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}

清单 17-42:在一个线程中以阻塞代码发送消息,并在一个异步代码块中等待消息

我们以创建一个异步通道开始,然后生成一个取得通道发送侧所有权的线程。在该线程中,我们发送数字 1 到 10,每个数字之间休眠一秒钟。最后,就像本章所做的那样,我们运行了一个以传递给 trpl::run 的异步代码块创建出的未来值。在这个未来值中,我们等待这些消息,就像在我们曾看到的其他消息传递示例中一样。

回到本章开头的场景,设想使用一个专门线程运行一组视频编码任务(因为视频编码是计算密集的),而以一个异步通道,通知用户界面这些操作已完成。在真实世界用例中,这类组合的例子数不胜数。

本章小结

这并不是咱们在本书中最后一次看到并发。第 21 章 中的项目,将在比这里讨论的简单示例更现实的情况下,应用这些概念,并更直接地比较使用线程和任务解决问题的方法。

无论咱们选择这些方法的哪种,Rust 都能为咱们提供编写安全、快速、并发代码所需的工具,无论是用于高吞吐量 web 服务器,还是某种嵌入式操作系统。

接下来,我们将讨论在咱们的 Rust 程序变大时,问题建模和构建解决方案的一些惯用方法。此外,我们还将讨论 Rust 的惯用语,与咱们在面向对象编程中熟悉的惯用语之间的关系。

(End)

Last change: 2025-05-27, commit: 20660c4

小额打赏,赞助 xfoss.com 长存......

微信 | 支付宝

若这里内容有帮助到你,请选择上述方式向 xfoss.com 捐赠。