Apagado y limpieza eficientes
El código del Listing 21-20 está respondiendo requests de forma asíncrona
mediante el uso de un pool de threads, como pretendíamos, Recibimos
algunas advertencias sobre los campos workers
, id
y thread
que no
estamos usando de forma directa que nos recuerda que no estamos limpiando
nada. Cuando usamos el método menos elegante ctrl-c
para detener el thread principal, todos los demás threads se detienen
inmediatamente también, incluso si están en medio de servir una request.
A continuación, implementaremos el trait Drop
para llamar a join
en cada uno
de los threads del pool para que puedan terminar las requests en las que están
trabajando antes de cerrar. Luego implementaremos una forma de decirle a los
threads que deben dejar de aceptar nuevas requests y cerrarse. Para ver este
código en acción, modificaremos nuestro servidor para que acepte solo dos
requests antes de cerrar el pool de threads correctamente.
Algo importante a tener en cuenta mientras avanzamos: nada de esto afecta las partes del código que manejan la ejecución de los closures, por lo que todo aquí sería exactamente igual si estuviéramos usando un thread pool para un runtime asincrónico.
Implementando el Trait Drop
en ThreadPool
Comencemos implementando Drop
en nuestro pool de threads. Cuando el pool se
destruye, nuestros threads deberían unirse para asegurarse de que terminan su
trabajo. El Listing 21-22 muestra un primer intento de implementación de Drop
;
este código aún no funcionará.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
Primero, iteramos a través de cada uno de los workers
del pool de threads.
Usamos &mut
para esto porque self
es una referencia mutable, y también
necesitamos poder mutar worker
. Para cada worker, imprimimos un mensaje
diciendo que este worker en particular se está cerrando, y luego llamamos a
join
en el thread de ese worker. Si la llamada a join
falla, usamos
unwrap
para que Rust entre en pánico y haga una salida poco elegante.
Aquí está el error que obtenemos cuando compilamos este código:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1763:17
|
1763 | pub fn join(self) -> Result<T> {
| ^^^^
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
El error nos dice que no podemos llamar a join
porque solo tenemos un
mutable borrow de cada worker
y join
toma el ownership de su argumento.
Para solucionar este problema, necesitamos mover el thread fuera de la
instancia de Worker
que posee thread
para que join
pueda consumir el
thread. Hicimos esto en el Listing 17-15: si Worker
tiene un
Option<thread::JoinHandle<()>>
en su lugar, podemos llamar al método
take
en el Option
para mover el valor fuera de la variante Some
y
dejar una variante None
en su lugar. En otras palabras, un Worker
que
se está ejecutando tendrá una variante Some
en thread
, y cuando
queramos limpiar un Worker
, reemplazaremos Some
con None
para que el
Worker
no tenga un thread para ejecutar.
Entonces sabemos que queremos actualizar la definición de Worker
de esta
manera:
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
Ahora usemos el compilador para encontrar los otros lugares que necesitan cambiar. Al verificar este código, obtenemos dos errores:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in `Option<JoinHandle<()>>`
|
note: the method `join` exists on the type `JoinHandle<()>`
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1763:5
|
1763 | pub fn join(self) -> Result<T> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None`
|
52 | worker.thread.expect("REASON").join().unwrap();
| +++++++++++++++++
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected `Option<JoinHandle<()>>`, found `JoinHandle<_>`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` (lib) due to 2 previous errors
Abordemos el segundo error, que apunta al código al final de Worker::new
;
necesitamos envolver el valor thread
en Some
cuando creamos un nuevo
Worker
. Haga los siguientes cambios para corregir este error:
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
El primer error está en nuestra implementación de Drop
. Mencionamos
anteriormente que pretendíamos llamar a take
en el valor Option
para mover
thread
fuera de worker
. Los siguientes cambios lo harán:
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
Como discutimos en el Capítulo 18, el método take
en Option
toma la variante
Some
y deja None
en su lugar. Estamos usando if let
para deconstruir el
Some
y obtener el thread; luego llamamos a join
en el thread. Si el thread
de un worker ya es None
, sabemos que ese worker ya ha tenido su thread
limpiado, por lo que en ese caso no sucede nada.
Señalando a los threads que dejen de escuchar por jobs
Con todos los cambios que hemos hecho, nuestro código se compila sin advertencias.
Sin embargo, las malas noticias son que este código aún no funciona de la manera
que queremos. La clave es la lógica en los closures ejecutados por los threads
de las instancias de Worker
: en este momento, llamamos a join
, pero eso no
detendrá los threads porque se ejecutan en un loop
para siempre buscando jobs.
Si intentamos dejar caer nuestro ThreadPool
con nuestra implementación actual
de drop
, el thread principal se bloqueará para siempre esperando a que el
primer thread termine.
Para solucionar este problema, necesitamos un cambio en la implementación de
drop
de ThreadPool
y luego un cambio en el loop de Worker
.
En primer lugar, cambiemos la implementación de drop
de ThreadPool
para
soltar explícitamente el sender
antes de esperar a que los threads terminen.
El Listing 21-23 muestra los cambios en ThreadPool
para soltar explícitamente
sender
. Usamos la misma técnica Option
y take
que hicimos con el thread
para poder mover sender
fuera de ThreadPool
:
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
Soltar sender
cierra el canal, lo que indica que no se enviarán más mensajes.
Cuando eso sucede, todas las llamadas a recv
que los workers hacen en el loop
infinito devolverán un error. En el Listing 21-24, cambiamos el loop de Worker
para salir del loop con gracia en ese caso, lo que significa que los hreads
terminarán cuando la implementación de drop
de ThreadPool
llame a join
en ellos.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Para ver este código en acción, modifiquemos main
para aceptar solo dos
requests antes de cerrar el servidor con gracia, como se muestra en el
Listing 21-25.
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
No querríamos que un servidor web del mundo real se apague después de servir solo dos requests. Este código solo demuestra que el apagado y la limpieza con gracia funcionan.
El método take
es definido en el trait Iterator
y limita la iteración
de los primeros dos items como máximo. El ThreadPool
saldrá del scope
al final de main
y la implementación drop
correrá.
Iniciamos el servidor con cargo run
y hacemos tres requests. La tercera
request debería fallar, y en su terminal debería ver una salida similar a esta:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Es posible que vea un orden diferente de workers y mensajes impresos. Podemos
ver cómo funciona este código a partir de los mensajes: los workers 0 y 3
obtuvieron las dos primeras requests. El servidor dejó de aceptar conexiones
después de la segunda conexión, y la implementación Drop
en ThreadPool
comienza a ejecutarse antes de que el worker 3 comience su trabajo. Al soltar
sender
desconecta a todos los workers y les dice que se apaguen. Los workers
imprimen un mensaje cuando se desconectan, y luego el pool de threads llama a
join
para esperar a que cada thread worker termine.
Fijémonos en un aspecto interesante de esta ejecución en particular: el
ThreadPool
soltó el sender
, y antes de que cualquier worker recibiera un
error, intentamos unirnos al worker 0. El worker 0 aún no había recibido un
error de recv
, por lo que el thread principal se bloqueó esperando a que el
worker 0 terminara. Mientras tanto, el worker 3 recibió un job y luego todos
los threads recibieron un error. Cuando el worker 0 terminó, el thread principal
esperó a que el resto de los workers terminaran. En ese momento, todos habían
salido de sus loops y se detuvieron.
¡Enhorabuena! Hemos completado nuestro proyecto; tenemos un servidor web básico que usa un pool de threads para responder de forma asíncrona. Podemos realizar un apagado con gracia del servidor, que limpia todos los threads del pool.
Aquí está el código completo como referencia:
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
¡Podríamos hacer más! Si quieres seguir mejorando este proyecto, aquí hay algunas ideas:
- Añadir más documentación a
ThreadPool
y sus métodos públicos. - Añadir tests de la funcionalidad de la librería.
- Cambiar las llamadas a
unwrap
por un manejo de errores más robusto. - Usar
ThreadPool
para realizar alguna tarea que no sea servir requests web. - Encontrar una librería de pool de threads en crates.io e implementar un servidor web similar usando la librería en su lugar. Luego compara su API y robustez con el pool de threads que implementamos.
Resumen
¡Bien hecho! ¡Has llegado al final del libro! Queremos agradecerte por unirte a nosotros en este tour de Rust. Ahora estás listo para implementar tus propios proyectos en Rust y ayudar con los proyectos de otras personas. Ten en cuenta que hay una comunidad acogedora de otros Rustaceans que estarían encantados de ayudarte con cualquier desafío que encuentres en tu viaje con Rust.