Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Przesyłanie danych między wątkami za pomocą przekazywania wiadomości

Jednym z coraz popularniejszych podejść do zapewnienia bezpiecznej współbieżności jest przekazywanie wiadomości, gdzie wątki lub aktorzy komunikują się, wysyłając sobie wiadomości zawierające dane. Oto idea w haśle z dokumentacji języka Go: „Nie komunikuj się poprzez współdzielenie pamięci; zamiast tego współdziel pamięć poprzez komunikację.”

Aby zrealizować współbieżność opartą na przekazywaniu wiadomości, biblioteka standardowa Rust dostarcza implementację kanałów. Kanał to ogólna koncepcja programistyczna, za pomocą której dane są przesyłane z jednego wątku do drugiego.

Możesz wyobrazić sobie kanał w programowaniu jako kierunkowy kanał wodny, taki jak strumień lub rzeka. Jeśli wrzucisz coś, na przykład gumową kaczkę, do rzeki, będzie ona płynęła w dół rzeki aż do końca szlaku wodnego.

Kanał ma dwie połówki: nadajnik i odbiornik. Połówka nadajnika to miejsce w górze rzeki, gdzie wrzucasz gumową kaczkę, a połówka odbiornika to miejsce, gdzie gumowa kaczka dociera w dół rzeki. Jedna część twojego kodu wywołuje metody na nadajniku z danymi, które chcesz wysłać, a inna część sprawdza koniec odbiorczy pod kątem nadchodzących wiadomości. Kanał jest uważany za zamknięty, jeśli którakolwiek z połówek – nadajnik lub odbiornik – zostanie usunięta.

Tutaj stworzymy program, który będzie miał jeden wątek do generowania wartości i wysyłania ich przez kanał, oraz inny wątek, który będzie odbierał te wartości i wypisywał je. Będziemy przesyłać proste wartości między wątkami za pomocą kanału, aby zilustrować tę funkcję. Gdy już zapoznasz się z tą techniką, będziesz mógł używać kanałów dla dowolnych wątków, które muszą się ze sobą komunikować, np. w systemie czatu lub w systemie, gdzie wiele wątków wykonuje części obliczeń i wysyła je do jednego wątku, który agreguje wyniki.

Najpierw, w Listingu 16-6, stworzymy kanał, ale nic z nim nie zrobimy. Zauważ, że to jeszcze się nie skompiluje, ponieważ Rust nie potrafi określić, jakiego typu wartości chcemy przesyłać przez kanał.

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Tworzymy nowy kanał za pomocą funkcji mpsc::channel; mpsc oznacza multiple producer, single consumer (wielu producentów, jeden konsument). W skrócie, sposób, w jaki biblioteka standardowa Rust implementuje kanały, oznacza, że kanał może mieć wiele końców wysyłających, które produkują wartości, ale tylko jeden koniec odbierający, który te wartości konsumuje. Wyobraź sobie wiele strumieni spływających do jednej dużej rzeki: Wszystko, co zostanie wysłane w dół któregokolwiek ze strumieni, znajdzie się w jednej rzece na końcu. Na razie zaczniemy od jednego producenta, ale dodamy wielu producentów, gdy ten przykład zadziała.

Funkcja mpsc::channel zwraca krotkę, której pierwszy element to koniec wysyłający – nadajnik – a drugi element to koniec odbierający – odbiornik. Skróty tx i rx są tradycyjnie używane w wielu dziedzinach odpowiednio dla transmitter (nadajnik) i receiver (odbiornik), więc tak nazywamy nasze zmienne, aby wskazać każdy koniec. Używamy instrukcji let ze wzorcem, który dekomponuje krotki; o użyciu wzorców w instrukcjach let i dekompozycji będziemy rozmawiać w Rozdziale 19. Na razie wiedz, że użycie instrukcji let w ten sposób jest wygodnym podejściem do wyodrębniania elementów krotki zwracanej przez mpsc::channel.

Przenieśmy koniec nadawczy do utworzonego wątku i niech wyśle jeden ciąg znaków, tak aby utworzony wątek komunikował się z głównym wątkiem, jak pokazano w Listingu 16-7. To jest jak wrzucenie gumowej kaczki do rzeki w górnym biegu lub wysłanie wiadomości na czacie z jednego wątku do drugiego.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Ponownie używamy thread::spawn do utworzenia nowego wątku, a następnie używamy move do przeniesienia tx do domknięcia, tak aby utworzony wątek był właścicielem tx. Utworzony wątek musi być właścicielem nadajnika, aby móc wysyłać wiadomości przez kanał.

Nadajnik ma metodę send, która przyjmuje wartość, którą chcemy wysłać. Metoda send zwraca typ Result<T, E>, więc jeśli odbiornik został już usunięty i nie ma dokąd wysłać wartości, operacja wysyłania zwróci błąd. W tym przykładzie wywołujemy unwrap, aby spowodować panikę w przypadku błędu. Ale w prawdziwej aplikacji obsłużylibyśmy to poprawnie: Wróć do Rozdziału 9, aby przejrzeć strategie właściwej obsługi błędów.

W Listingu 16-8 pobierzemy wartość z odbiornika w głównym wątku. To jest jak pobieranie gumowej kaczki z wody na końcu rzeki lub odbieranie wiadomości na czacie.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Odbiornik ma dwie przydatne metody: recv i try_recv. Używamy recv, skrótu od receive (odbierz), która zablokuje wykonanie głównego wątku i poczeka, aż wartość zostanie wysłana przez kanał. Gdy tylko wartość zostanie wysłana, recv zwróci ją w Result<T, E>. Gdy nadajnik zostanie zamknięty, recv zwróci błąd, sygnalizując, że nie nadejdą już więcej wartości.

Metoda try_recv nie blokuje, ale zamiast tego natychmiast zwraca Result<T, E>: wartość Ok zawierającą wiadomość, jeśli jest dostępna, oraz wartość Err, jeśli tym razem nie ma żadnych wiadomości. Użycie try_recv jest przydatne, jeśli ten wątek ma inną pracę do wykonania podczas oczekiwania na wiadomości: Moglibyśmy napisać pętlę, która co jakiś czas wywołuje try_recv, obchodzi wiadomość, jeśli jest dostępna, a w przeciwnym razie wykonuje inną pracę przez jakiś czas, zanim ponownie sprawdzi.

W tym przykładzie dla uproszczenia użyliśmy recv; główny wątek nie ma innej pracy do wykonania poza czekaniem na wiadomości, więc blokowanie głównego wątku jest odpowiednie.

Po uruchomieniu kodu z Listingu 16-8 zobaczymy wartość wypisaną z głównego wątku:

Got: hi

Idealnie!

Przenoszenie własności przez kanały

Zasady własności odgrywają kluczową rolę w przesyłaniu wiadomości, ponieważ pomagają pisać bezpieczny, współbieżny kod. Zapobieganie błędom w programowaniu współbieżnym to zaleta myślenia o własności w całym programie Rust. Przeprowadźmy eksperyment, aby pokazać, jak kanały i własność współpracują, aby zapobiegać problemom: Spróbujemy użyć wartości val w utworzonym wątku po tym, jak wysłaliśmy ją przez kanał. Spróbuj skompilować kod z Listingu 16-9, aby zobaczyć, dlaczego ten kod jest niedozwolony.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Tutaj próbujemy wypisać val po tym, jak wysłaliśmy go przez kanał za pomocą tx.send. Zezwolenie na to byłoby złym pomysłem: Gdy wartość zostanie wysłana do innego wątku, ten wątek mógłby ją zmodyfikować lub usunąć, zanim ponownie spróbujemy jej użyć. Potencjalnie modyfikacje innego wątku mogłyby spowodować błędy lub nieoczekiwane rezultaty z powodu niespójnych lub nieistniejących danych. Jednak Rust zgłasza błąd, jeśli spróbujemy skompilować kod z Listingu 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
 8 |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Nasz błąd współbieżności spowodował błąd kompilacji. Funkcja send przejmuje własność swojego parametru, a po przeniesieniu wartości odbiornik przejmuje jej własność. To uniemożliwia nam przypadkowe ponowne użycie wartości po jej wysłaniu; system własności sprawdza, czy wszystko jest w porządku.

Wysyłanie wielu wartości

Kod z Listingu 16-8 skompilował się i zadziałał, ale nie pokazał nam jasno, że dwa oddzielne wątki komunikowały się ze sobą przez kanał.

W Listingu 16-10 wprowadziliśmy pewne modyfikacje, które udowodnią, że kod z Listingu 16-8 działa współbieżnie: utworzony wątek będzie teraz wysyłał wiele wiadomości i pauzował na sekundę między każdą wiadomością.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Tym razem utworzony wątek ma wektor ciągów, które chcemy wysłać do głównego wątku. Iterujemy po nich, wysyłając każdy z osobna i pauzując między każdym, wywołując funkcję thread::sleep z wartością Duration wynoszącą jedną sekundę.

W głównym wątku nie wywołujemy już jawnie funkcji recv: Zamiast tego traktujemy rx jako iterator. Dla każdej otrzymanej wartości wypisujemy ją. Gdy kanał zostanie zamknięty, iteracja się zakończy.

Podczas uruchamiania kodu z Listingu 16-10 powinieneś zobaczyć następujący wynik z jednominutową przerwą między każdą linią:

Got: hi
Got: from
Got: the
Got: thread

Ponieważ nie mamy żadnego kodu, który pauzuje lub opóźnia w pętli for w głównym wątku, możemy stwierdzić, że główny wątek czeka na odebranie wartości z utworzonego wątku.

Tworzenie wielu producentów

Wcześniej wspomnieliśmy, że mpsc to skrót od multiple producer, single consumer (wiele producentów, jeden konsument). Wykorzystajmy mpsc i rozszerzmy kod z Listingu 16-10, aby utworzyć wiele wątków, które wszystkie wysyłają wartości do tego samego odbiornika. Możemy to zrobić, klonując nadajnik, jak pokazano w Listingu 16-11.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}

Tym razem, zanim utworzymy pierwszy wątek, wywołujemy clone na nadajniku. Spowoduje to utworzenie nowego nadajnika, który możemy przekazać do pierwszego utworzonego wątku. Oryginalny nadajnik przekazujemy do drugiego utworzonego wątku. Daje nam to dwa wątki, każdy wysyłający różne wiadomości do jednego odbiornika.

Po uruchomieniu kodu, wynik powinien wyglądać mniej więcej tak:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Możesz zobaczyć wartości w innej kolejności, w zależności od twojego systemu. To właśnie sprawia, że współbieżność jest interesująca, a także trudna. Jeśli poeksperymentujesz z thread::sleep, nadając mu różne wartości w różnych wątkach, każde uruchomienie będzie bardziej niedeterministyczne i za każdym razem będzie generować inne dane wyjściowe.

Teraz, gdy przyjrzeliśmy się, jak działają kanały, spójrzmy na inną metodę współbieżności.