Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Od serwera jednowątkowego do wielowątkowego

Obecnie serwer będzie przetwarzał każde żądanie po kolei, co oznacza, że nie przetworzy drugiego połączenia, dopóki pierwsze połączenie nie zostanie zakończone. Gdyby serwer otrzymywał coraz więcej żądań, to wykonanie sekwencyjne byłoby coraz mniej optymalne. Jeśli serwer otrzyma żądanie, którego przetwarzanie zajmuje dużo czasu, kolejne żądania będą musiały czekać, aż długie żądanie zostanie zakończone, nawet jeśli nowe żądania mogą być przetworzone szybko. Będziemy musieli to naprawić, ale najpierw przyjrzymy się problemowi w działaniu.

Symulacja wolnego żądania

Przyjrzymy się, jak wolno przetwarzane żądanie może wpływać na inne żądania wysyłane do naszej obecnej implementacji serwera. Listing 21-10 implementuje obsługę żądania do /sleep z symulowaną wolną odpowiedzią, która spowoduje, że serwer będzie spał przez pięć sekund przed odpowiedzią.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Przełączyliśmy się z if na match, ponieważ mamy teraz trzy przypadki. Musimy jawnie dopasować do wycinka request_line, aby dopasować wzorce do wartości literałów ciągów znaków; match nie wykonuje automatycznego referencjonowania i dereferencjonowania, tak jak to robi metoda równości.

Pierwsze ramię jest takie samo jak blok if z Listingu 21-9. Drugie ramię pasuje do żądania do /sleep. Po odebraniu tego żądania serwer będzie spał przez pięć sekund przed renderowaniem pomyślnej strony HTML. Trzecie ramię jest takie samo jak blok else z Listingu 21-9.

Możesz zobaczyć, jak prymitywny jest nasz serwer: prawdziwe biblioteki obsługiwałyby rozpoznawanie wielu żądań w znacznie mniej obszerny sposób!

Uruchom serwer za pomocą cargo run. Następnie otwórz dwa okna przeglądarki: jedno dla http://127.0.0.1:7878 i drugie dla http://127.0.0.1:7878/sleep. Jeśli wprowadzisz URI / kilka razy, jak poprzednio, zobaczysz, że odpowiada szybko. Ale jeśli wprowadzisz /sleep, a następnie załadujesz /, zobaczysz, że / czeka, aż sleep zakończy swój pełny pięciosekundowy czas, zanim zostanie załadowane.

Istnieje wiele technik, których moglibyśmy użyć, aby uniknąć kumulowania się żądań za wolnym żądaniem, w tym użycie async, jak zrobiliśmy to w Rozdziale 17; ta, którą zaimplementujemy, to pula wątków.

Zwiększanie przepustowości za pomocą puli wątków

Pula wątków to grupa uruchomionych wątków, które są gotowe i czekają na obsługę zadania. Kiedy program otrzymuje nowe zadanie, przypisuje jeden z wątków w puli do tego zadania, a ten wątek będzie przetwarzał zadanie. Pozostałe wątki w puli są dostępne do obsługi wszelkich innych zadań, które nadejdą, podczas gdy pierwszy wątek przetwarza. Kiedy pierwszy wątek zakończy przetwarzanie swojego zadania, zostaje on zwrócony do puli bezczynnych wątków, gotowy do obsługi nowego zadania. Pula wątków pozwala na współbieżne przetwarzanie połączeń, zwiększając przepustowość serwera.

Ograniczymy liczbę wątków w puli do niewielkiej liczby, aby chronić się przed atakami DoS; gdyby nasz program tworzył nowy wątek dla każdego żądania, gdy ono nadejdzie, ktoś, kto wykona 10 milionów żądań do naszego serwera, mógłby spowodować chaos, zużywając wszystkie zasoby naszego serwera i zatrzymując przetwarzanie żądań.

Zamiast tworzyć nieograniczoną liczbę wątków, będziemy mieć stałą liczbę wątków oczekujących w puli. Żądania, które nadejdą, są wysyłane do puli do przetworzenia. Pula będzie utrzymywać kolejkę przychodzących żądań. Każdy z wątków w puli pobierze żądanie z tej kolejki, obsłuży je, a następnie poprosi kolejkę o kolejne żądanie. Dzięki tej konstrukcji możemy przetwarzać do N żądań współbieżnie, gdzie N to liczba wątków. Jeśli każdy wątek odpowiada na długotrwałe żądanie, kolejne żądania mogą nadal gromadzić się w kolejce, ale zwiększyliśmy liczbę długotrwałych żądań, które możemy obsłużyć, zanim osiągniemy ten punkt.

Ta technika to tylko jeden z wielu sposobów na zwiększenie przepustowości serwera WWW. Inne opcje, które możesz zbadać, to model fork/join, jednowątkowy model asynchronicznego I/O oraz wielowątkowy model asynchronicznego I/O. Jeśli interesuje Cię ten temat, możesz poczytać więcej o innych rozwiązaniach i spróbować je zaimplementować; w języku niskiego poziomu, takim jak Rust, wszystkie te opcje są możliwe.

Zanim zaczniemy implementować pulę wątków, porozmawiajmy o tym, jak powinno wyglądać użycie puli. Kiedy próbujesz zaprojektować kod, najpierw napisanie interfejsu klienta może pomóc w kierowaniu twoim projektem. Napisz API kodu tak, aby było ono ustrukturyzowane w sposób, w jaki chcesz je wywołać; następnie zaimplementuj funkcjonalność w ramach tej struktury, zamiast implementować funkcjonalność, a następnie projektować publiczne API.

Podobnie jak w Rozdziale 12 stosowaliśmy rozwój sterowany testami, tutaj zastosujemy rozwój sterowany kompilatorem. Napiszemy kod, który wywołuje pożądane funkcje, a następnie będziemy analizować błędy kompilatora, aby określić, co powinniśmy zmienić, aby kod zadziałał. Zanim to jednak zrobimy, zbadamy technikę, której nie będziemy używać jako punktu wyjścia.

Struktura kodu, gdybyśmy mogli tworzyć wątek dla każdego żądania

Najpierw zbadajmy, jak nasz kod mógłby wyglądać, gdyby tworzył nowy wątek dla każdego połączenia. Jak wspomniano wcześniej, nie jest to nasz ostateczny plan ze względu na problemy z potencjalnym tworzeniem nieograniczonej liczby wątków, ale jest to punkt wyjścia do uzyskania najpierw działającego serwera wielowątkowego. Następnie dodamy pulę wątków jako ulepszenie, a kontrastowanie obu rozwiązań będzie łatwiejsze.

Listing 21-11 pokazuje zmiany, które należy wprowadzić w funkcji main, aby utworzyć nowy wątek do obsługi każdego strumienia w pętli for.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Jak nauczyłeś się w Rozdziale 16, thread::spawn utworzy nowy wątek, a następnie uruchomi kod w zamknięciu w nowym wątku. Jeśli uruchomisz ten kod i załadujesz /sleep w przeglądarce, a następnie / w dwóch kolejnych zakładkach przeglądarki, faktycznie zobaczysz, że żądania do / nie muszą czekać na zakończenie /sleep. Jednak, jak wspomnieliśmy, ostatecznie to przeciąży system, ponieważ tworzyłbyś nowe wątki bez żadnego limitu.

Możesz również pamiętać z Rozdziału 17, że to jest dokładnie ten rodzaj sytuacji, w której async i await naprawdę błyszczą! Miej to na uwadze, gdy budujemy pulę wątków i zastanawiamy się, jak wyglądałyby rzeczy inaczej lub tak samo z async.

Tworzenie skończonej liczby wątków

Chcemy, aby nasza pula wątków działała w podobny, znajomy sposób, tak aby przejście z wątków na pulę wątków nie wymagało dużych zmian w kodzie, który używa naszego API. Listing 21-12 przedstawia hipotetyczny interfejs struktury ThreadPool, której chcemy użyć zamiast thread::spawn.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Używamy ThreadPool::new do utworzenia nowej puli wątków z konfigurowalną liczbą wątków, w tym przypadku czterech. Następnie, w pętli for, pool.execute ma podobny interfejs do thread::spawn w tym sensie, że przyjmuje zamknięcie, które pula powinna uruchomić dla każdego strumienia. Musimy zaimplementować pool.execute tak, aby przyjmował zamknięcie i przekazywał je do wątku w puli do uruchomienia. Ten kod jeszcze się nie skompiluje, ale spróbujemy, aby kompilator mógł nas poprowadzić, jak go naprawić.

Budowanie ThreadPool za pomocą rozwoju sterowanego kompilatorem

Wprowadź zmiany z Listingu 21-12 do src/main.rs, a następnie użyjmy błędów kompilatora z cargo check, aby kierować naszym rozwojem. Oto pierwszy błąd, który otrzymujemy:

$ cargo check
    Sprawdzanie hello v0.1.0 (file:///projects/hello)
error[E0433]: nie udało się rozwiązać: użycie niezadeklarowanego typu `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ użycie niezadeklarowanego typu `ThreadPool`

Więcej informacji o tym błędzie znajdziesz, używając `rustc --explain E0433`.
error: nie udało się skompilować `hello` (bin "hello") z powodu 1 poprzedniego błędu

Świetnie! Ten błąd mówi nam, że potrzebujemy typu lub modułu ThreadPool, więc teraz go zbudujemy. Nasza implementacja ThreadPool będzie niezależna od rodzaju pracy, jaką wykonuje nasz serwer WWW. Zatem, zmieńmy skrzynkę hello z skrzynki binarnej na skrzynkę biblioteki, aby przechowywać naszą implementację ThreadPool. Po zmianie na skrzynkę biblioteki, moglibyśmy również używać oddzielnej biblioteki puli wątków do dowolnej pracy, którą chcemy wykonać za pomocą puli wątków, a nie tylko do obsługi żądań WWW.

Utwórz plik src/lib.rs zawierający następującą, najprostszą definicję struktury ThreadPool, jaką możemy na razie mieć:

pub struct ThreadPool;

Następnie edytuj plik main.rs, aby wprowadzić ThreadPool do zakresu z skrzynki biblioteki, dodając następujący kod na początku src/main.rs:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Ten kod nadal nie będzie działać, ale sprawdźmy go ponownie, aby otrzymać następny błąd, którym musimy się zająć:

$ cargo check
    Sprawdzanie hello v0.1.0 (file:///projects/hello)
error[E0599]: nie znaleziono funkcji ani elementu stowarzyszonego o nazwie `new` dla struktury `ThreadPool` w bieżącym zakresie
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ nie znaleziono funkcji ani elementu stowarzyszonego w `ThreadPool`

Więcej informacji o tym błędzie znajdziesz, używając `rustc --explain E0599`.
error: nie udało się skompilować `hello` (bin "hello") z powodu 1 poprzedniego błędu

Ten błąd wskazuje, że następnym krokiem jest utworzenie stowarzyszonej funkcji o nazwie new dla ThreadPool. Wiemy również, że new musi mieć jeden parametr, który może przyjąć 4 jako argument i powinien zwracać instancję ThreadPool. Zaimplementujmy najprostszą funkcję new, która będzie miała te cechy:

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Wybraliśmy usize jako typ parametru size, ponieważ wiemy, że ujemna liczba wątków nie ma sensu. Wiemy również, że użyjemy tej 4 jako liczby elementów w kolekcji wątków, do czego służy typ usize, jak omówiono w sekcji „Typy całkowite” w Rozdziale 3.

Sprawdźmy ponownie kod:

$ cargo check
    Sprawdzanie hello v0.1.0 (file:///projects/hello)
error[E0599]: nie znaleziono metody o nazwie `execute` dla struktury `ThreadPool` w bieżącym zakresie
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ nie znaleziono metody w `ThreadPool`

Więcej informacji o tym błędzie znajdziesz, używając `rustc --explain E0599`.
error: nie udało się skompilować `hello` (bin "hello") z powodu 1 poprzedniego błędu

Teraz błąd występuje, ponieważ nie mamy metody execute na ThreadPool. Przypomnij sobie z sekcji „Tworzenie skończonej liczby wątków”, że zdecydowaliśmy, iż nasza pula wątków powinna mieć interfejs podobny do thread::spawn. Ponadto, zaimplementujemy funkcję execute tak, aby pobierała przekazane jej zamknięcie i oddawała je bezczynnemu wątkowi w puli do uruchomienia.

Zdefiniujemy metodę execute na ThreadPool, aby przyjmowała zamknięcie jako parametr. Przypomnij sobie z sekcji „Przenoszenie przechwyconych wartości poza zamknięcia” w Rozdziale 13, że możemy przyjmować zamknięcia jako parametry z trzema różnymi cechami: Fn, FnMut i FnOnce. Musimy zdecydować, jaki rodzaj zamknięcia użyć tutaj. Wiemy, że skończymy robiąc coś podobnego do implementacji thread::spawn z biblioteki standardowej, więc możemy spojrzeć na ograniczenia, jakie ma sygnatura thread::spawn na swoim parametrze. Dokumentacja pokazuje nam następujące:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Parametr typu F jest tym, który nas tutaj interesuje; parametr typu T jest związany z wartością zwracaną, a tym się nie zajmujemy. Widzimy, że spawn używa FnOnce jako ograniczenia cechy dla F. To prawdopodobnie to, czego chcemy, ponieważ ostatecznie przekażemy argument, który otrzymamy w execute, do spawn. Możemy być jeszcze bardziej pewni, że FnOnce to cecha, której chcemy użyć, ponieważ wątek do uruchomienia żądania wykona zamknięcie tego żądania tylko raz, co pasuje do Once w FnOnce.

Parametr typu F ma również ograniczenie cechy Send i ograniczenie czasu życia 'static, które są przydatne w naszej sytuacji: potrzebujemy Send do przeniesienia zamknięcia z jednego wątku do drugiego oraz 'static, ponieważ nie wiemy, jak długo wątek będzie wykonywał. Stwórzmy metodę execute na ThreadPool, która będzie przyjmować parametr generyczny typu F z tymi ograniczeniami:

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Nadal używamy () po FnOnce, ponieważ to FnOnce reprezentuje zamknięcie, które nie przyjmuje żadnych parametrów i zwraca typ jednostkowy (). Podobnie jak w definicjach funkcji, typ zwracany może być pominięty z sygnatury, ale nawet jeśli nie mamy parametrów, nadal potrzebujemy nawiasów.

Powołując się na to, jest to najprostsza implementacja metody execute: nic nie robi, ale staramy się tylko, aby nasz kod się skompilował. Sprawdźmy to ponownie:

$ cargo check
    Sprawdzanie hello v0.1.0 (file:///projects/hello)
    Zakończono `dev` profil [nieoptymalny + debuginfo] cel(e) w 0.24s

Kompiluje się! Ale zwróć uwagę, że jeśli spróbujesz cargo run i wyślesz żądanie w przeglądarce, zobaczysz błędy w przeglądarce, które widzieliśmy na początku rozdziału. Nasza biblioteka jeszcze nie wywołuje zamknięcia przekazanego do execute!

Uwaga: Powiedzenie, które możesz usłyszeć o językach z rygorystycznymi kompilatorami, takich jak Haskell i Rust, brzmi: „Jeśli kod się kompiluje, to działa”. Ale to powiedzenie nie jest uniwersalnie prawdziwe. Nasz projekt się kompiluje, ale absolutnie nic nie robi! Gdybyśmy budowali prawdziwy, kompletny projekt, byłby to dobry moment, aby zacząć pisać testy jednostkowe, aby sprawdzić, czy kod się kompiluje i ma pożądane zachowanie.

Zastanów się: co byłoby tu inne, gdybyśmy zamiast zamknięcia wykonywali przyszłość?

Walidacja liczby wątków w new

Nic nie robimy z parametrami new i execute. Zaimplementujmy ciała tych funkcji z pożądanym zachowaniem. Na początek pomyślmy o new. Wcześniej wybraliśmy typ bez znaku dla parametru size, ponieważ pula z ujemną liczbą wątków nie ma sensu. Jednak pula z zerową liczbą wątków również nie ma sensu, a zero jest jak najbardziej prawidłowym usize. Dodamy kod, który sprawdzi, czy size jest większe od zera, zanim zwrócimy instancję ThreadPool, i spowodujemy panikę programu, jeśli otrzyma zero, używając makra assert!, jak pokazano na Listingu 21-13.

pub struct ThreadPool;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Dodaliśmy również dokumentację dla naszego ThreadPool za pomocą komentarzy doc. Zauważ, że zastosowaliśmy dobre praktyki dokumentacyjne, dodając sekcję, która wskazuje sytuacje, w których nasza funkcja może spowodować panikę, jak omówiono w Rozdziale 14. Spróbuj uruchomić cargo doc --open i kliknąć strukturę ThreadPool, aby zobaczyć, jak wyglądają wygenerowane dokumenty dla new!

Zamiast dodawać makro assert!, jak to zrobiliśmy tutaj, moglibyśmy zmienić new na build i zwrócić Result, tak jak zrobiliśmy to z Config::build w projekcie I/O w Listingu 12-9. Ale w tym przypadku zdecydowaliśmy, że próba utworzenia puli wątków bez żadnych wątków powinna być nieodwracalnym błędem. Jeśli czujesz się ambitny, spróbuj napisać funkcję o nazwie build z następującą sygnaturą, aby porównać ją z funkcją new:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

Tworzenie miejsca na przechowywanie wątków

Teraz, gdy wiemy, że mamy prawidłową liczbę wątków do przechowywania w puli, możemy utworzyć te wątki i przechowywać je w strukturze ThreadPool przed zwróceniem tej struktury. Ale jak „przechowujemy” wątek? Spójrzmy jeszcze raz na sygnaturę thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Funkcja spawn zwraca JoinHandle<T>, gdzie T to typ, który zwraca zamknięcie. Spróbujmy użyć JoinHandle i zobaczmy, co się stanie. W naszym przypadku zamknięcia, które przekazujemy do puli wątków, będą obsługiwać połączenie i nic nie zwracać, więc T będzie typem jednostkowym ().

Kod z Listingu 21-14 skompiluje się, ale jeszcze nie tworzy żadnych wątków. Zmieniliśmy definicję ThreadPool, aby przechowywała wektor instancji thread::JoinHandle<()>, zainicjalizowaliśmy wektor o pojemności size, skonfigurowaliśmy pętlę for, która uruchomi kod do tworzenia wątków, i zwróciła instancję ThreadPool zawierającą je.

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // utwórz kilka wątków i zapisz je w wektorze
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Wprowadziliśmy std::thread do zakresu w skrzynce biblioteki, ponieważ używamy thread::JoinHandle jako typu elementów w wektorze w ThreadPool.

Po otrzymaniu prawidłowego rozmiaru, nasz ThreadPool tworzy nowy wektor, który może przechowywać size elementów. Funkcja with_capacity wykonuje to samo zadanie co Vec::new, ale z ważną różnicą: wstępnie alokuje miejsce w wektorze. Ponieważ wiemy, że musimy przechowywać size elementów w wektorze, wykonanie tej alokacji z góry jest nieco bardziej wydajne niż użycie Vec::new, który zmienia rozmiar w miarę wstawiania elementów.

Po ponownym uruchomieniu cargo check powinno się udać.

Wysyłanie kodu z ThreadPool do wątku

Zostawiliśmy komentarz w pętli for w Listingu 21-14 dotyczący tworzenia wątków. Tutaj przyjrzymy się, jak faktycznie tworzymy wątki. Biblioteka standardowa zapewnia thread::spawn jako sposób tworzenia wątków, a thread::spawn oczekuje kodu, który wątek powinien uruchomić natychmiast po utworzeniu wątku. Jednak w naszym przypadku chcemy utworzyć wątki i sprawić, by czekały na kod, który wyślemy później. Implementacja wątków w bibliotece standardowej nie zawiera sposobu, aby to zrobić; musimy to zaimplementować ręcznie.

Zaimplementujemy to zachowanie, wprowadzając nową strukturę danych między ThreadPool a wątkami, która będzie zarządzać tym nowym zachowaniem. Nazwiemy tę strukturę danych Worker, co jest powszechnym terminem w implementacjach puli. Worker pobiera kod, który musi zostać uruchomiony, i uruchamia go w swoim wątku.

Pomyśl o ludziach pracujących w kuchni w restauracji: pracownicy czekają, aż przyjdą zamówienia od klientów, a następnie są odpowiedzialni za przyjęcie tych zamówień i ich zrealizowanie.

Zamiast przechowywać wektor instancji JoinHandle<()> w puli wątków, będziemy przechowywać instancje struktury Worker. Każdy Worker będzie przechowywał pojedynczą instancję JoinHandle<()>. Następnie zaimplementujemy metodę w Worker, która przyjmie zamknięcie kodu do uruchomienia i wyśle je do już działającego wątku do wykonania. Każdemu Workerowi nadamy również id, abyśmy mogli odróżnić różne instancje Worker w puli podczas logowania lub debugowania.

Oto nowy proces, który nastąpi po utworzeniu ThreadPool. Kod, który wysyła zamknięcie do wątku, zaimplementujemy po skonfigurowaniu Worker w ten sposób:

  1. Zdefiniuj strukturę Worker, która zawiera id i JoinHandle<()>.
  2. Zmień ThreadPool, aby zawierał wektor instancji Worker.
  3. Zdefiniuj funkcję Worker::new, która przyjmuje numer id i zwraca instancję Worker zawierającą id i wątek uruchomiony z pustym zamknięciem.
  4. W ThreadPool::new użyj licznika pętli for do wygenerowania id, utwórz nowego Worker z tym id i przechowuj Worker w wektorze.

Jeśli jesteś gotowy na wyzwanie, spróbuj samodzielnie zaimplementować te zmiany, zanim spojrzysz na kod w Listingu 21-15.

Gotowy? Oto Listing 21-15 z jednym ze sposobów wprowadzenia powyższych modyfikacji.

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Zmieniliśmy nazwę pola w ThreadPool z threads na workers, ponieważ teraz przechowuje ono instancje Worker zamiast instancji JoinHandle<()>. Używamy licznika w pętli for jako argumentu do Worker::new i przechowujemy każdego nowego Worker w wektorze o nazwie workers.

Zewnętrzny kod (taki jak nasz serwer w src/main.rs) nie musi znać szczegółów implementacji dotyczących używania struktury Worker w ThreadPool, dlatego sprawiamy, że struktura Worker i jej funkcja new są prywatne. Funkcja Worker::new używa podanego przez nas id i przechowuje instancję JoinHandle<()>, która jest tworzona poprzez uruchomienie nowego wątku za pomocą pustego zamknięcia.

Uwaga: Jeśli system operacyjny nie może utworzyć wątku z powodu niewystarczających zasobów systemowych, thread::spawn spowoduje panikę. To spowoduje panikę całego naszego serwera, nawet jeśli utworzenie niektórych wątków może się powieść. Dla uproszczenia, takie zachowanie jest w porządku, ale w produkcyjnej implementacji puli wątków prawdopodobnie chciałbyś użyć std::thread::Builder i jego metody spawn, która zamiast tego zwraca Result.

Ten kod skompiluje się i przechowuje liczbę instancji Worker, którą określiliśmy jako argument do ThreadPool::new. Ale nadal nie przetwarzamy zamknięcia, które otrzymujemy w execute. Przyjrzyjmy się, jak to zrobić w następnej kolejności.

Wysyłanie żądań do wątków za pośrednictwem kanałów

Następny problem, którym się zajmiemy, to fakt, że zamknięcia przekazane do thread::spawn absolutnie nic nie robią. Obecnie zamknięcie, które chcemy wykonać, otrzymujemy w metodzie execute. Ale musimy przekazać thread::spawn zamknięcie do uruchomienia, gdy tworzymy każdego Worker podczas tworzenia ThreadPool.

Chcemy, aby struktury Worker, które właśnie utworzyliśmy, pobierały kod do uruchomienia z kolejki przechowywanej w ThreadPool i wysyłały ten kod do swojego wątku do wykonania.

Kanały, o których dowiedzieliśmy się w Rozdziale 16 – prosty sposób komunikacji między dwoma wątkami – byłyby idealne do tego zastosowania. Użyjemy kanału do pełnienia funkcji kolejki zadań, a execute wyśle zadanie z ThreadPool do instancji Worker, które wyślą zadanie do swojego wątku. Oto plan:

  1. ThreadPool utworzy kanał i będzie trzymać się nadawcy.
  2. Każdy Worker będzie trzymać się odbiorcy.
  3. Utworzymy nową strukturę Job, która będzie przechowywać zamknięcia, które chcemy wysłać kanałem.
  4. Metoda execute wyśle zadanie, które chce wykonać, za pośrednictwem nadawcy.
  5. W swoim wątku, Worker będzie iterował po swoim odbiorniku i wykonywał zamknięcia wszystkich otrzymanych zadań.

Zacznijmy od utworzenia kanału w ThreadPool::new i przechowywania nadawcy w instancji ThreadPool, jak pokazano na Listingu 21-16. Struktura Job na razie nic nie przechowuje, ale będzie typem elementu, który wysyłamy kanałem.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

W ThreadPool::new tworzymy nasz nowy kanał i sprawiamy, że pula przechowuje nadawcę. To się pomyślnie skompiluje.

Spróbujmy przekazać odbiornik kanału do każdego Worker’a, gdy pula wątków tworzy kanał. Wiemy, że chcemy użyć odbiornika w wątku, który uruchamiają instancje Worker, więc odwołamy się do parametru receiver w zamknięciu. Kod z Listingu 21-17 jeszcze się nie skompiluje.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Wprowadziliśmy kilka małych i prostych zmian: przekazujemy odbiornik do Worker::new, a następnie używamy go wewnątrz zamknięcia.

Kiedy próbujemy sprawdzić ten kod, otrzymujemy następujący błąd:

$ cargo check
    Sprawdzanie hello v0.1.0 (file:///projects/hello)
error[E0382]: użycie przeniesionej wartości: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- następuje przeniesienie, ponieważ `receiver` ma typ `std::sync::mpsc::Receiver<Job>`, który nie implementuje cechy `Copy`
...
25 |         for id in 0..size {
   |         ----------------- wewnątrz tej pętli
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ wartość przeniesiona tutaj, w poprzedniej iteracji pętli
   |
note: rozważ zmianę typu tego parametru w metodzie `new` na pożyczanie, jeśli posiadanie wartości nie jest konieczne
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- w tej metodzie       ^^^^^^^^^^^^^^^^^^^ ten parametr przejmuje własność wartości
help: rozważ przeniesienie wyrażenia poza pętlę, aby było przeniesione tylko raz
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

Więcej informacji o tym błędzie znajdziesz, używając `rustc --explain E0382`.
error: nie udało się skompilować `hello` (lib) z powodu 1 poprzedniego błędu

Kod próbuje przekazać receiver do wielu instancji Worker. To nie zadziała, jak pamiętacie z Rozdziału 16: implementacja kanału dostarczana przez Rusta to kanał wielu producentów, jednego konsumenta. Oznacza to, że nie możemy po prostu sklonować końca konsumującego kanału, aby naprawić ten kod. Nie chcemy również wysyłać wiadomości wielokrotnie do wielu konsumentów; chcemy jednej listy wiadomości z wieloma instancjami Worker, tak aby każda wiadomość była przetwarzana raz.

Dodatkowo, pobieranie zadania z kolejki kanału wiąże się z mutacją receiver, więc wątki potrzebują bezpiecznego sposobu na współdzielenie i modyfikowanie receiver; w przeciwnym razie mogą wystąpić warunki wyścigu (omówione w Rozdziale 16).

Przypomnij sobie inteligentne wskaźniki bezpieczne dla wątków omówione w Rozdziale 16: Aby współdzielić własność między wieloma wątkami i umożliwić wątkom mutację wartości, musimy użyć Arc<Mutex<T>>. Typ Arc pozwoli wielu instancjom Worker posiadać odbiornik, a Mutex zapewni, że tylko jeden Worker pobierze zadanie z odbiornika na raz. Listing 21-18 pokazuje zmiany, które musimy wprowadzić.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

W ThreadPool::new umieszczamy odbiornik w Arc i Mutex. Dla każdego nowego Worker, klonujemy Arc, aby zwiększyć licznik referencji, tak aby instancje Worker mogły współdzielić własność odbiornika.

Dzięki tym zmianom kod się kompiluje! Coraz bliżej!

Implementacja metody execute

Zaimplementujmy w końcu metodę execute na ThreadPool. Zmienimy również Job ze struktury na alias typu dla obiektu cechy, który przechowuje typ zamknięcia, który otrzymuje execute. Jak omówiono w sekcji „Synonimy typów i aliasy typów” w Rozdziale 20, aliasy typów pozwalają nam skracać długie typy dla ułatwienia użytkowania. Spójrz na Listing 21-19.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Po utworzeniu nowej instancji Job za pomocą zamknięcia, które otrzymujemy w execute, wysyłamy to zadanie przez koniec wysyłający kanału. Wywołujemy unwrap na send w przypadku, gdy wysyłanie się nie powiedzie. Może się to zdarzyć, jeśli na przykład zatrzymamy wszystkie nasze wątki od wykonywania, co oznacza, że koniec odbierający przestał odbierać nowe wiadomości. W tej chwili nie możemy zatrzymać naszych wątków od wykonywania: nasze wątki kontynuują wykonywanie tak długo, jak długo istnieje pula. Powodem, dla którego używamy unwrap, jest to, że wiemy, że przypadek błędu się nie zdarzy, ale kompilator tego nie wie.

Ale jeszcze nie skończyliśmy! W Worker, nasze zamknięcie przekazywane do thread::spawn nadal tylko referuje koniec odbiorczy kanału. Zamiast tego, potrzebujemy, aby zamknięcie zapętlało się w nieskończoność, prosząc koniec odbiorczy kanału o zadanie i uruchamiając zadanie, gdy je otrzyma. Wprowadźmy zmianę pokazaną na Listingu 21-20 w Worker::new.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} otrzymał zadanie; wykonuję.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Tutaj najpierw wywołujemy lock na receiver, aby uzyskać muteks, a następnie wywołujemy unwrap, aby spowodować panikę w przypadku błędów. Uzyskanie blokady może zakończyć się niepowodzeniem, jeśli muteks jest w stanie zatrucia, co może się zdarzyć, jeśli jakiś inny wątek panikował, trzymając blokadę, zamiast ją zwolnić. W tej sytuacji wywołanie unwrap w celu spowodowania paniki tego wątku jest prawidłowym działaniem. Możesz zmienić to unwrap na expect z komunikatem o błędzie, który jest dla ciebie sensowny.

Jeśli uzyskamy blokadę na muteksie, wywołujemy recv, aby otrzymać Job z kanału. Ostateczny unwrap również tutaj pomija wszelkie błędy, które mogą wystąpić, jeśli wątek posiadający nadawcę został zamknięty, podobnie jak metoda send zwraca Err, jeśli odbiornik zostanie zamknięty.

Wywołanie recv blokuje, więc jeśli nie ma jeszcze zadania, bieżący wątek będzie czekał, aż zadanie stanie się dostępne. Mutex<T> zapewnia, że tylko jeden wątek Worker w danym momencie próbuje zażądać zadania.

Nasza pula wątków jest teraz w stanie działającym! Uruchom ją za pomocą cargo run i wyślij kilka żądań:

$ cargo run
   Kompilowanie hello v0.1.0 (file:///projects/hello)
warning: pole `workers` nigdy nie jest odczytywane
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- pole w tej strukturze
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` domyślnie włączone

warning: pola `id` i `thread` nigdy nie są odczytywane
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ pola w tej strukturze
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) wygenerowało 2 ostrzeżenia
    Zakończono `dev` profil [nieoptymalny + debuginfo] cel(e) w 4.91s
     Uruchamianie `target/debug/hello`
Worker 0 otrzymał zadanie; wykonuję.
Worker 2 otrzymał zadanie; wykonuję.
Worker 1 otrzymał zadanie; wykonuję.
Worker 3 otrzymał zadanie; wykonuję.
Worker 0 otrzymał zadanie; wykonuję.
Worker 2 otrzymał zadanie; wykonuję.
Worker 1 otrzymał zadanie; wykonuję.
Worker 3 otrzymał zadanie; wykonuję.
Worker 0 otrzymał zadanie; wykonuję.
Worker 2 otrzymał zadanie; wykonuję.

Sukces! Mamy teraz pulę wątków, która wykonuje połączenia asynchronicznie. Nigdy nie jest tworzonych więcej niż cztery wątki, więc nasz system nie zostanie przeciążony, jeśli serwer otrzyma wiele żądań. Jeśli wyślemy żądanie do /sleep, serwer będzie mógł obsłużyć inne żądania, zlecając je innym wątkom.

Uwaga: Jeśli otworzysz /sleep w wielu oknach przeglądarki jednocześnie, mogą się ładować po kolei w pięciosekundowych odstępach. Niektóre przeglądarki internetowe wykonują wiele instancji tego samego żądania sekwencyjnie z powodów buforowania. To ograniczenie nie jest spowodowane przez nasz serwer WWW.

To dobry moment, aby zatrzymać się i zastanowić, jak kod z Listingów 21-18, 21-19 i 21-20 różniłby się, gdybyśmy używali przyszłości zamiast zamknięcia dla wykonywanej pracy. Jakie typy uległyby zmianie? Jak różniłyby się sygnatury metod, jeśli w ogóle? Jakie części kodu pozostałyby takie same?

Po zapoznaniu się z pętlą while let w Rozdziale 17 i Rozdziale 19, możesz się zastanawiać, dlaczego nie napisaliśmy kodu wątku Worker, jak pokazano na Listingu 21-21.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Tworzy nową pulę wątków.
    ///
    /// `size` to liczba wątków w puli.
    ///
    /// # Panics
    ///
    /// Funkcja `new` spowoduje panikę, jeśli `size` wynosi zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} otrzymał zadanie; wykonuję.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Ten kod kompiluje się i działa, ale nie prowadzi do pożądanego zachowania wątkowości: wolne żądanie nadal będzie powodować, że inne żądania będą czekać na przetworzenie. Powód jest nieco subtelny: struktura Mutex nie ma publicznej metody unlock, ponieważ własność blokady opiera się na czasie życia MutexGuard<T> wewnątrz LockResult<MutexGuard<T>>, które zwraca metoda lock. W czasie kompilacji, checker pożyczeń może wymusić zasadę, że zasób chroniony przez Mutex nie może być dostępny, chyba że posiadamy blokadę. Jednak ta implementacja może również spowodować, że blokada będzie utrzymywana dłużej niż zamierzono, jeśli nie będziemy pamiętać o czasie życia MutexGuard<T>.

Kod z Listingu 21-20, który używa let job = receiver.lock().unwrap().recv().unwrap(); działa, ponieważ w przypadku let, wszelkie tymczasowe wartości użyte w wyrażeniu po prawej stronie znaku równości są natychmiast usuwane, gdy kończy się instrukcja let. Jednak while let (oraz if let i match) nie usuwa tymczasowych wartości aż do końca powiązanego bloku. W Listingu 21-21 blokada pozostaje w posiadaniu przez cały czas trwania wywołania job(), co oznacza, że inne instancje Worker nie mogą odbierać zadań.