Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Delikatne zamykanie i sprzątanie

Kod z Listingu 21-20 odpowiada na żądania asynchronicznie, używając puli wątków, tak jak zamierzaliśmy. Otrzymujemy kilka ostrzeżeń o polach workers, id i thread, których nie używamy w bezpośredni sposób, co przypomina nam, że nic nie sprzątamy. Gdy użyjemy mniej eleganckiej metody ctrl-C do zatrzymania głównego wątku, wszystkie inne wątki są natychmiast zatrzymywane, nawet jeśli są w trakcie obsługi żądania.

Następnie zaimplementujemy cechę Drop, aby wywołać join dla każdego z wątków w puli, aby mogły zakończyć pracę nad żądaniami, zanim się zamkną. Następnie zaimplementujemy sposób, aby poinformować wątki, że powinny przestać akceptować nowe żądania i zamknąć się. Aby zobaczyć ten kod w akcji, zmodyfikujemy nasz serwer, aby akceptował tylko dwa żądania przed delikatnym zamknięciem puli wątków.

Jedna rzecz do zauważenia, gdy będziemy postępować: nic z tego nie wpływa na części kodu, które obsługują wykonywanie zamknięć, więc wszystko tutaj byłoby takie samo, gdybyśmy używali puli wątków do asynchronicznego środowiska uruchomieniowego.

Implementacja cechy Drop dla ThreadPool

Zacznijmy od zaimplementowania Drop dla naszej puli wątków. Kiedy pula zostanie zrzucona, wszystkie nasze wątki powinny się połączyć, aby upewnić się, że zakończyły swoją pracę. Listing 21-22 pokazuje pierwszą próbę implementacji Drop; ten kod jeszcze nie będzie działał poprawnie.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Zamykanie wątku roboczego {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} otrzymał zadanie; wykonuję.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Najpierw przechodzimy przez każdego z workerów puli wątków. Używamy &mut do tego, ponieważ self jest mutowalną referencją, a my musimy również być w stanie mutować worker. Dla każdego workera, drukujemy komunikat informujący, że ta konkretna instancja Worker jest zamykana, a następnie wywołujemy join na wątku tej instancji Worker. Jeśli wywołanie join zakończy się niepowodzeniem, używamy unwrap, aby Rust panikował i przeszedł w stan niełaskawego zamknięcia.

Oto błąd, który otrzymujemy podczas kompilacji tego kodu:

$ cargo check
    Sprawdzanie hello v0.1.0 (file:///projects/hello)
error[E0507]: nie można przenieść z `worker.thread`, który znajduje się za mutowalną referencją
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` przeniesiony z powodu wywołania tej metody
   |             |
   |             przeniesienie następuje, ponieważ `worker.thread` ma typ `JoinHandle<()>`, który nie implementuje cechy `Copy`
   |
note: `JoinHandle::<T>::join` przejmuje własność odbiornika `self`, co przenosi `worker.thread`
  --> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17

Więcej informacji o tym błędzie znajdziesz, używając `rustc --explain E0507`.
error: nie udało się skompilować `hello` (lib) z powodu 1 poprzedniego błędu

Błąd mówi nam, że nie możemy wywołać join, ponieważ mamy tylko mutowalne pożyczenie każdego workera, a join przejmuje własność swojego argumentu. Aby rozwiązać ten problem, musimy przenieść wątek z instancji Worker, która jest właścicielem thread, tak aby join mogło zużyć wątek. Jednym ze sposobów jest zastosowanie tego samego podejścia, które przyjęliśmy w Listingu 18-15. Gdyby Worker zawierał Option<thread::JoinHandle<()>>, moglibyśmy wywołać metodę take na Option, aby przenieść wartość z wariantu Some i pozostawić wariant None na jego miejscu. Innymi słowy, działający Worker miałby wariant Some w thread, a gdybyśmy chcieli posprzątać Workera, zastąpilibyśmy Some przez None, aby Worker nie miał wątku do uruchomienia.

Jednakże, jedynym momentem, kiedy to by się pojawiło, byłoby zrzucanie Workera. W zamian, musielibyśmy radzić sobie z Option<thread::JoinHandle<()>> wszędzie, gdzie uzyskiwaliśmy dostęp do worker.thread. Idiomatyczny Rust często używa Option, ale gdy zauważysz, że opakowujesz coś, co wiesz, że zawsze będzie obecne, w Option jako obejście, to dobrym pomysłem jest poszukanie alternatywnych podejść, aby uczynić kod czystszym i mniej podatnym na błędy.

W tym przypadku istnieje lepsza alternatywa: metoda Vec::drain. Akceptuje parametr zakresu, aby określić, które elementy usunąć z wektora i zwraca iterator tych elementów. Przekazanie składni zakresu .. usunie wszystkie wartości z wektora.

Zatem musimy zaktualizować implementację drop w ThreadPool w następujący sposób:

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Zamykanie wątku roboczego {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} otrzymał zadanie; wykonuję.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

To rozwiązuje błąd kompilatora i nie wymaga żadnych innych zmian w naszym kodzie. Zauważ, że ponieważ drop może być wywołany podczas paniki, unwrap również może spowodować panikę i doprowadzić do podwójnej paniki, co natychmiastowo crashuje program i kończy wszelkie trwające sprzątanie. Jest to w porządku dla przykładowego programu, ale nie jest zalecane dla kodu produkcyjnego.

Sygnalizowanie wątkom, aby przestały nasłuchiwać zadań

Dzięki wszystkim zmianom, które wprowadziliśmy, nasz kod kompiluje się bez żadnych ostrzeżeń. Niestety, ten kod nie działa tak, jak byśmy chcieli. Klucz leży w logice zamknięć uruchamianych przez wątki instancji Worker: obecnie wywołujemy join, ale to nie spowoduje zamknięcia wątków, ponieważ one loop w nieskończoność, szukając zadań. Jeśli spróbujemy zrzucić nasz ThreadPool z naszą obecną implementacją drop, główny wątek będzie blokował się w nieskończoność, czekając na zakończenie pierwszego wątku.

Aby rozwiązać ten problem, będziemy potrzebować zmiany w implementacji drop dla ThreadPool, a następnie zmiany w pętli Worker.

Najpierw zmienimy implementację drop w ThreadPool tak, aby jawnie zrzucić sender przed oczekiwaniem na zakończenie wątków. Listing 21-23 pokazuje zmiany w ThreadPool w celu jawnego zrzucenia sender. W przeciwieństwie do wątku, tutaj musimy użyć Option, aby móc przenieść sender z ThreadPool za pomocą Option::take.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Zamykanie wątku roboczego {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} otrzymał zadanie; wykonuję.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Zrzucenie sender zamyka kanał, co oznacza, że więcej wiadomości nie zostanie wysłanych. Kiedy to nastąpi, wszystkie wywołania recv, które instancje Worker wykonują w nieskończonej pętli, zwrócą błąd. W Listingu 21-24 zmieniamy pętlę Worker tak, aby w tym przypadku elegancko wychodziła z pętli, co oznacza, że wątki zakończą działanie, gdy implementacja drop w ThreadPool wywoła na nich join.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Zamykanie wątku roboczego {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} otrzymał zadanie; wykonuję.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} rozłączył się; zamykam.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}

Aby zobaczyć ten kod w akcji, zmodyfikujmy main, aby akceptował tylko dwa żądania przed delikatnym zamknięciem serwera, jak pokazano na Listingu 21-25.

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Zamykam.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Nie chciałbyś, aby prawdziwy serwer WWW zamykał się po obsłużeniu zaledwie dwóch żądań. Ten kod jedynie demonstruje, że delikatne zamykanie i sprzątanie działa poprawnie.

Metoda take jest zdefiniowana w ceche Iterator i ogranicza iterację do maksymalnie dwóch pierwszych elementów. ThreadPool wyjdzie poza zakres na końcu main, a implementacja drop zostanie uruchomiona.

Uruchom serwer za pomocą cargo run i wyślij trzy żądania. Trzecie żądanie powinno zakończyć się błędem, a w terminalu powinieneś zobaczyć wynik podobny do tego:

$ cargo run
   Kompilowanie hello v0.1.0 (file:///projects/hello)
    Zakończono `dev` profil [nieoptymalny + debuginfo] cel(e) w 0.41s
     Uruchamianie `target/debug/hello`
Worker 0 otrzymał zadanie; wykonuję.
Zamykam.
Zamykanie wątku roboczego 0
Worker 3 otrzymał zadanie; wykonuję.
Worker 1 rozłączył się; zamykam.
Worker 2 rozłączył się; zamykam.
Worker 3 rozłączył się; zamykam.
Worker 0 rozłączył się; zamykam.
Zamykanie wątku roboczego 1
Zamykanie wątku roboczego 2
Zamykanie wątku roboczego 3

Możesz zobaczyć inną kolejność identyfikatorów Worker i wydrukowanych komunikatów. Z komunikatów widzimy, jak działa ten kod: instancje Worker 0 i 3 otrzymały pierwsze dwa żądania. Serwer przestał akceptować połączenia po drugim połączeniu, a implementacja Drop w ThreadPool zaczyna się wykonywać, zanim Worker 3 nawet rozpocznie swoje zadanie. Zrzucone sender odłącza wszystkie instancje Worker i informuje je o zamknięciu. Instancje Worker drukują komunikat, gdy się odłączają, a następnie pula wątków wywołuje join, aby poczekać, aż każdy wątek Worker zakończy działanie.

Zauważ jeden ciekawy aspekt tego konkretnego wykonania: ThreadPool zrzucił sender, a zanim którykolwiek Worker otrzymał błąd, próbowaliśmy dołączyć Worker 0. Worker 0 nie otrzymał jeszcze błędu z recv, więc główny wątek zablokował się, czekając na zakończenie Worker 0. W międzyczasie Worker 3 otrzymał zadanie, a następnie wszystkie wątki otrzymały błąd. Gdy Worker 0 zakończył działanie, główny wątek czekał na zakończenie pozostałych instancji Worker. W tym momencie wszystkie one wyszły ze swoich pętli i zatrzymały się.

Gratulacje! Właśnie zakończyliśmy nasz projekt; mamy podstawowy serwer WWW, który używa puli wątków do asynchronicznej odpowiedzi. Jesteśmy w stanie wykonać delikatne zamknięcie serwera, które czyści wszystkie wątki w puli.

Poniżej znajduje się cały kod dla odniesienia:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Zamykam.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Zamykanie wątku roboczego {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} otrzymał zadanie; wykonuję.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} rozłączył się; zamykam.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Można by tu zrobić więcej! Jeśli chcesz kontynuować ulepszanie tego projektu, oto kilka pomysłów:

  • Dodaj więcej dokumentacji do ThreadPool i jego publicznych metod.
  • Dodaj testy funkcjonalności biblioteki.
  • Zmień wywołania unwrap na bardziej solidną obsługę błędów.
  • Użyj ThreadPool do wykonania innego zadania niż obsługa żądań WWW.
  • Znajdź skrzynkę puli wątków na crates.io i zaimplementuj podobny serwer WWW, używając zamiast tego tej skrzynki. Następnie porównaj jej API i solidność z zaimplementowaną przez nas pulą wątków.

Podsumowanie

Świetnie! Dotarłeś do końca książki! Chcemy podziękować za dołączenie do nas w tej podróży po Rust. Jesteś teraz gotowy, aby zaimplementować własne projekty Rust i pomagać w projektach innych ludzi. Pamiętaj, że istnieje gościnna społeczność innych Rustaceans, którzy z przyjemnością pomogą ci w wszelkich wyzwaniach, które napotkasz w swojej podróży z Rustem.