优雅关机与内存清理

Graceful Shutdown and Cleanup

清单 20-20 中的代码,经由线程池而如咱们所设想的那样,异步响应请求。咱们会收到有关 workersidthread 这三个,咱们未以直接方式用到字段的一些告警,这些告警就提醒了咱们,咱们没有清理任何东西。当咱们使用不那么优雅的 Ctrl + c 方式,来挂起主线程时,全部其他线程也会被立即停止,即使他们处于服务某个请求中。

接下来,咱们随后将实现 Drop 特质,以在线程池中各个线程上调用 join,如此这些线程便可以在关闭前,完成他们正工作于其上的请求。随后咱们将实现一种告知线程他们应停止接受新请求并关闭的方法。为观察这方面代码的运作,咱们将把咱们的服务器,修改为在有序关闭其线程池之前,只接受两个请求。

实现 ThreadPool 上的 Drop 特质

Implementing the Drop Trait on ThreadPool

咱们来以在咱们的线程池上实现 Drop 开始。当线程池被丢弃时,咱们的那些线程就都应归拢,join 一下,以确保他们完成他们的工作。下面清单 20-22 给出了 Drop 实现的首次尝试;此代码尚不会很好地编译。

文件名:src/lib.rs

#![allow(unused)]
fn main() {
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println! ("关闭 worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}
}

清单 20-22:在线程池超出作用域时归拢各个线程

注:关于线程的 join 方法,请参考 Java Thread.join详解Joining Threads in Java

首选,咱们遍历了线程池中 workers 的各个线程。由于 self 是个可变引用,且咱们还需要能修改 worker,因此咱们为这个遍历使用了 &mut。对于各个 worker,咱们打印了讲到这个特定 worker 正要关闭的一条消息,并在随后在那个 worker 的线程上调用了 join。当到 join 的这个调用失败时,咱们便使用 unwrap 来令到 Rust 终止运行,并进入到非优雅有序关闭。

下面时在咱们编译这代码时,得到的报错信息:

$ cargo check
    Checking hello v0.1.0 (/home/lenny.peng/rust-lang-zh_CN/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:71:13
   |
71 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/thread/mod.rs:1589:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

这个报错告诉我们,由于咱们只有各个 worker 的可变借用,而 join 会取得其参数的所有权,因此咱们无法调用 join。为解决这个额外难题,咱们就需要将线程从拥有 threadWorker 实例迁出,如此 join 就可以消费那个线程了。咱们曾在清单 17-15 中这样做过:若 Worker 保存的是一个 Option<thread::JoinHandle<()>>,那么咱们就可以在 Option 上调用 take 方法,来将 Some 变种中的那个值迁出,并在其位置处留下一个 None。也就是说,正运行的一个 Worker,将有着 thread 中的一个 Some 变种,而当咱们打算清理某个 Worker 时,咱们就将以 None 来替换 Some,如此那个 Worker 就没有了要运行的线程了。

因此咱们就明白了咱们是要如下更新 Worker 的定义:

文件名:src/lib.rs

#![allow(unused)]
fn main() {
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}
}

现在咱们来依靠编译器,找出其他需要修改的地方。对此代码进行检查,咱们会得到两个报错:

$ cargo check
    Checking hello v0.1.0 (/home/lenny.peng/rust-lang-zh_CN/hello)
error[E0308]: mismatched types
  --> src/lib.rs:62:22
   |
62 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
62 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:71:27
   |
71 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option<JoinHandle<()>>`
   |
note: the method `join` exists on the type `JoinHandle<()>`
  --> /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/thread/mod.rs:1589:5
help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None`
   |
71 |             worker.thread.expect("REASON").join().unwrap();
   |                          +++++++++++++++++

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

咱们来解决那第二个错误,其指向了 Worker::new 末尾的代码;在创建新 Worker 时,咱们需要把那个 thread 值封装在 Some 中。请做出如下修改来修复这个错误:

文件名:src/lib.rs

#![allow(unused)]
fn main() {
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --跳过代码--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}
}

第一个错误是在咱们的 Drop 实现中,早先咱们曾提到,咱们原本打算调用这个 Option 值上的 take,来将 threadworker 中迁出。下面的修改就将这样做:

文件名:src/lib.rs

#![allow(unused)]
fn main() {
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println! ("关闭 worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
}

正如曾在第 17 章中讨论过的那样,Option 上的 take 方法,会将那个 Some 变种取出,并在其位置处留下 None。咱们使用了 if let 来解构那个 Some 而得到了那个线程;随后咱们在线程上调用了 join。若某个 worker 的线程已经是 None,那么咱们就知道那个 worker 已经让他的线程清理掉了,因此在那种情况下什么也不会发生。

通知线程停止收听作业

Signaling to the Threads to Stop Listening for Jobs

在咱们已做出的全部修改下,咱们的代码会不带任何错误的编译了。但是,坏消息是这些代码尚不会按照咱们想要的方式运作。问题关键在于,由 Worker 实例的线程运行的闭包中的逻辑:此刻,咱们调用了 join,但由于线程是在一直 loop 查找作业,所以那样做将不会关闭线程。若咱们以咱们当前的 drop 实现丢弃咱们的 ThreadPool,那么主线程将一直阻塞于等待第一个线程结束。

为修复这个问题,咱们将需要 ThreadPooldrop 实现中的一个修改,以及其后的 Worker 循环中的一个修改。

首选,咱们将把 ThreadPooldrop 实现,修改为在等待线程结束前显式地丢弃 sender。下面清单 20-23 给出了对 ThreadPool 显示丢弃 sender 的修改。为能将 sendThreadPool 迁出,咱们使用了与咱们曾对线程做过的同样 Optiontake 技巧:

文件名:src/lib.rs

#![allow(unused)]
fn main() {
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --跳过代码--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        // --跳过代码--

        ThreadPool {
            workers,
            sender: Some<sender>,
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println! ("关闭 worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
}

清单 20-23:在归拢那些 worker 线程前显式丢弃 sender

丢弃 sender 就会关闭通道,这表明将不会有更多消息发出。当那发生时,在无限循环中那些 worker 所做的到 recv 的全部全部调用,就会返回错误。在下面清单 20-24 中,咱们修改了 Worker 的循环,来在那种情况下优雅有序地退出循环,这就意味着在 ThreadPooldrop 实现在那些线程上调用 join 时,他们将结束。

文件名:src/lib.rs

#![allow(unused)]
fn main() {
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println! ("Worker {id} 获取到一项作业;执行中。");

                    job();
                }
                Err(_) => {
                    println! ("Worker {id} 已断开链接;关闭中。");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}
}

清单 20-24:在 recv 返回错误时显式跳出循环

要看到运作中的代码,咱们就来把 main 修改为在有序关闭服务器钱,只接收两个请求,如下清单 20-25 中所示。

文件名:src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_conn(stream);
        });
    }

    println! ("关闭中。");
}

清单 20-25: 在服务两个请求后通过退出循环关闭服务器

咱们是不会想要真实世界的 web 服务器在仅服务两个请求后就关闭的。这段代码只演示了这种有序关闭与清理是在正常工作。

其中的 take 方法,是定义在 Iterator 特质中的,且将迭代限制到最多头两个项目。在 main 的结束处,ThreadPool 将超出作用域,而 drop 实现将运行。

请以 cargo run 启动服务器,并构造三个请求。第三个请求应会出错,而在终端里咱们应看到类似于下面这样的输出:

$ cargo run                                                                   16s
    Finished dev [unoptimized + debuginfo] target(s) in 0.00s
     Running `target/debug/hello`
Worker 0 获取到一项作业;执行中。
关闭中。
关闭 worker 0
Worker 1 获取到一项作业;执行中。
Worker 3 已断开链接;关闭中。
Worker 2 已断开链接;关闭中。
Worker 1 已断开链接;关闭中。
Worker 0 已断开链接;关闭中。
关闭 worker 1
关闭 worker 2
关闭 worker 3

咱们可能会看到不同顺序的 worker 与消息打印出来。咱们能从这些消息,看出代码是如何工作的:worker 12 获取到了头两个请求。服务器在第二个 TCP 连接之后,便停止了接收连接,而 ThreadPool 上的 Drop 实现,在 worker 2 还没开始其作业前,便开始了执行。丢弃 sender 会断开全部 worker 并告诉他们要关闭。那些 worker 在他们断开连接时,都各自打印了一条消息,而随后线程池便调用了 join 来等待各个 worker 线程结束。

请注意这次特定执行的一个有趣方面:ThreadPool 弃用了 sender,而在有任何 worker 接收到错误前,咱们就尝试归拢了 worker 0worker 0 还不曾从 recv 获取到一个错误,因此主线程就阻塞于等待 worker 0 结束。与此同时,worker 1 收到了一项作业,而随后全部线程都收到了错误。在 worker 0 结束时,主线程就等待其余 worker 结束。而在那个时候,他们都已退出了他们的循环并停止了。

恭喜!咱们先进已经完成了咱们的项目;咱们有了一个运用线程池来异步响应的基本 web 服务器。咱们能够完成服务器有序关闭,这会清理掉线程池中的全部线程。

以下是用于参考的全部代码:

文件名:src/main.rs

use hello::ThreadPool;

use std::{
    fs,
    thread,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_conn(stream);
        });
    }

    println! ("关闭中。");
}

fn handle_conn(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let req_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &req_line[..] {
        "GET / HTTP/1.1" => ( "HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(10));
            ("HTTP/1.1 200 0K", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let resp =
        format! ("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(resp.as_bytes()).unwrap();
}

文件名:src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// 创建出一个新的 ThreadPool。
    ///
    /// 其中的 size 为线程池中线程的数目。
    ///
    /// # 终止运行
    ///
    /// 这个 `new` 函数将在 size 为零时终止运行。
    pub fn new(size: usize) -> ThreadPool {
        assert! (size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println! ("关闭 worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println! ("Worker {id} 获取到一项作业;执行中。");

                    job();
                }
                Err(_) => {
                    println! ("Worker {id} 已断开链接;关闭中。");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}
}

这里咱们可以做更多事情!若咱们打算继续加强这个项目,下面是一些想法:

  • ThreadPool 及其公开方法添加更多文档;

  • 给这个库的功能添加测试;

  • 把一些调用修改为 unwrap,以获得更多的错误处理鲁棒性;

  • 运用 ThreadPool 来完成除服务 web 请求外的一些别的任务;

  • crates.io 上找到某个线程池代码箱,并用该代码箱实现一个类似的 web 服务器。随后将其 API 及鲁棒性,与咱们实现的线程池相比较。

本章小结

干得好!咱们已经读完了这整本书!要感谢咱们加入到这次 Rust 之旅中来。咱们现在已经准备好实现咱们自己的 Rust 项目,以及帮助其他人的项目了。请记住有那么一个由热衷于就咱们在 Rust 道路上,所遇到的任何挑战,而帮助咱们的其他 Rust 公民的热情社区。

Last change: 2023-12-01, commit: ecd2e9e