Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stosowanie współbieżności z async

W tej sekcji zastosujemy async do niektórych z tych samych wyzwań współbieżności, które rozwiązaliśmy za pomocą wątków w Rozdziale 16. Ponieważ wiele kluczowych idei zostało tam już omówionych, w tej sekcji skupimy się na różnicach między wątkami a futures.

W wielu przypadkach API do pracy z współbieżnością przy użyciu async są bardzo podobne do tych do pracy z wątkami. W innych przypadkach są one zupełnie różne. Nawet gdy API wyglądają podobnie między wątkami a async, często mają różne zachowania — i prawie zawsze mają różne charakterystyki wydajności.

Tworzenie nowego zadania za pomocą spawn_task

Pierwszą operacją, którą zajęliśmy się w sekcji „Tworzenie nowego wątku za pomocą spawn w Rozdziale 16, było zliczanie na dwóch oddzielnych wątkach. Zróbmy to samo, używając async. Crate trpl dostarcza funkcję spawn_task, która wygląda bardzo podobnie do API thread::spawn, oraz funkcję sleep, która jest asynchroniczną wersją API thread::sleep. Możemy ich użyć razem do zaimplementowania przykładu zliczania, jak pokazano w Listingu 17-6.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

Jako punkt wyjścia, konfigurujemy naszą funkcję main z trpl::block_on, tak aby nasza funkcja najwyższego poziomu mogła być asynchroniczna.

Uwaga: Od tego momentu w rozdziale, każdy przykład będzie zawierał ten sam kod opakowujący z trpl::block_on w main, więc często będziemy go pomijać, tak jak to robimy z main. Pamiętaj, aby uwzględnić go w swoim kodzie!

Następnie piszemy dwie pętle w tym bloku, każda zawierająca wywołanie trpl::sleep, które czeka pół sekundy (500 milisekund) przed wysłaniem kolejnej wiadomości. Jedną pętlę umieszczamy w ciele trpl::spawn_task, a drugą w pętli for najwyższego poziomu. Dodajemy również await po wywołaniach sleep.

Ten kod zachowuje się podobnie do implementacji opartej na wątkach – w tym także to, że możesz zobaczyć wiadomości pojawiające się w innej kolejności w własnym terminalu, gdy go uruchomisz:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Ta wersja zatrzymuje się, gdy tylko zakończy się pętla for w ciele głównego bloku async, ponieważ zadanie uruchomione przez spawn_task zostaje wyłączone, gdy funkcja main się kończy. Jeśli chcesz, aby działało do końca zadania, będziesz musiał użyć uchwytu join do oczekiwania na zakończenie pierwszego zadania. W przypadku wątków, użyliśmy metody join do „blokowania”, dopóki wątek nie zakończył działania. W Listingu 17-7 możemy użyć await do zrobienia tego samego, ponieważ sam uchwyt zadania jest future. Jego typ Output to Result, więc również go rozpakowujemy po oczekiwaniu na niego.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

Ta zaktualizowana wersja działa, dopóki obie pętle się nie zakończą:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Jak dotąd, wydaje się, że async i wątki dają nam podobne wyniki, tylko z inną składnią: używając await zamiast wywoływania join na uchwycie join, oraz oczekując na wywołania sleep.

Większą różnicą jest to, że nie musieliśmy tworzyć kolejnego wątku systemu operacyjnego, aby to zrobić. W rzeczywistości, nie musimy nawet tworzyć tutaj żadnego zadania. Ponieważ bloki async kompilują się do anonimowych futures, możemy umieścić każdą pętlę w bloku async i pozwolić środowisku uruchomieniowemu uruchomić je obie do końca za pomocą funkcji trpl::join.

W sekcji „Czekanie na zakończenie wszystkich wątków” w Rozdziale 16 pokazaliśmy, jak używać metody join na typie JoinHandle zwracanym po wywołaniu std::thread::spawn. Funkcja trpl::join jest podobna, ale dla futures. Kiedy podasz jej dwie futures, produkuje jedną nową future, której wynikiem jest krotka zawierająca wyniki każdej future, którą przekazałeś, gdy obie zakończą działanie. Tak więc, w Listingu 17-8 używamy trpl::join, aby poczekać na zakończenie zarówno fut1, jak i fut2. Nie oczekujemy na fut1 i fut2, ale zamiast tego na nową future produkowaną przez trpl::join. Ignorujemy wynik, ponieważ jest to tylko krotka zawierająca dwie wartości jednostkowe.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

Po uruchomieniu widzimy, że obie futures działają do końca:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Teraz zobaczysz dokładnie tę samą kolejność za każdym razem, co jest bardzo różne od tego, co widzieliśmy w przypadku wątków i trpl::spawn_task w Listingu 17-7. Dzieje się tak, ponieważ funkcja trpl::join jest sprawiedliwa, oznacza to, że sprawdza każdą future tak samo często, naprzemiennie między nimi, i nigdy nie pozwala jednej wyprzedzić drugiej, jeśli ta druga jest gotowa. W przypadku wątków, system operacyjny decyduje, który wątek sprawdzić i jak długo pozwolić mu działać. W przypadku async Rust, środowisko uruchomieniowe decyduje, które zadanie sprawdzić. (W praktyce, szczegóły stają się skomplikowane, ponieważ środowisko uruchomieniowe async może wykorzystywać wątki systemu operacyjnego pod maską jako część sposobu zarządzania współbieżnością, więc zagwarantowanie sprawiedliwości może być bardziej pracochłonne dla środowiska uruchomieniowego – ale nadal jest to możliwe!) Środowiska uruchomieniowe nie muszą gwarantować sprawiedliwości dla żadnej danej operacji i często oferują różne API, aby umożliwić wybór, czy chcesz sprawiedliwości, czy nie.

Wypróbuj niektóre z tych wariacji oczekiwania na futures i zobacz, co robią:

  • Usuń blok async wokół jednej lub obu pętli.
  • Oczekuj na każdy blok async natychmiast po jego zdefiniowaniu.
  • Opakuj tylko pierwszą pętlę w blok async i oczekuj na wynikową future po ciele drugiej pętli.

Dodatkowym wyzwaniem jest sprawdzenie, czy potrafisz przewidzieć, jaki będzie wynik w każdym przypadku przed uruchomieniem kodu!

Przesyłanie danych między dwoma zadaniami za pomocą przekazywania wiadomości

Współdzielenie danych między futures również będzie znajome: ponownie użyjemy przekazywania wiadomości, ale tym razem z asynchronicznymi wersjami typów i funkcji. Obierzemy nieco inną ścieżkę niż w sekcji „Przesyłanie danych między wątkami za pomocą przekazywania wiadomości” w Rozdziale 16, aby zilustrować niektóre kluczowe różnice między współbieżnością opartą na wątkach a współbieżnością opartą na futures. W Listingu 17-9 zaczniemy od pojedynczego bloku async – nie uruchamiając oddzielnego zadania, tak jak uruchomiliśmy oddzielny wątek.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}

Tutaj używamy trpl::channel, asynchronicznej wersji API kanału typu wiele-producentów, jeden-konsument, którego użyliśmy z wątkami w Rozdziale 16. Asynchroniczna wersja API różni się tylko nieco od wersji opartej na wątkach: używa mutowalnego, a nie niemutowalnego odbiornika rx, a jego metoda recv produkuje future, na którą musimy czekać, zamiast bezpośrednio produkować wartość. Teraz możemy wysyłać wiadomości z nadawcy do odbiornika. Zauważ, że nie musimy uruchamiać oddzielnego wątku ani nawet zadania; wystarczy, że będziemy czekać na wywołanie rx.recv.

Synchroniczna metoda Receiver::recv w std::mpsc::channel blokuje do czasu otrzymania wiadomości. Metoda trpl::Receiver::recv tego nie robi, ponieważ jest asynchroniczna. Zamiast blokować, przekazuje kontrolę z powrotem do środowiska uruchomieniowego, dopóki nie zostanie odebrana wiadomość lub strona wysyłająca kanału nie zostanie zamknięta. Natomiast nie czekamy na wywołanie send, ponieważ nie blokuje. Nie musi, ponieważ kanał, do którego wysyłamy, jest nieograniczony.

Uwaga: Ponieważ cały ten kod async działa w bloku async w wywołaniu trpl::block_on, wszystko w nim może uniknąć blokowania. Jednak kod poza im blokiem będzie blokował, dopóki funkcja block_on nie zwróci wartości. Na tym polega cała idea funkcji trpl::block_on: pozwala wybrać, gdzie zablokować na jakimś zestawie kodu async, a tym samym, gdzie przejść między kodem synchronicznym a asynchronicznym.

Zauważ dwie rzeczy dotyczące tego przykładu. Po pierwsze, wiadomość dotrze natychmiast. Po drugie, choć używamy tutaj przyszłości, nie ma jeszcze współbieżności. Wszystko w listingu dzieje się sekwencyjnie, tak jakby nie było żadnych futures.

Zajmijmy się pierwszą częścią, wysyłając serię wiadomości i pauzując między nimi, jak pokazano w Listingu 17-10.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

Oprócz wysyłania wiadomości, musimy je odbierać. W tym przypadku, ponieważ wiemy, ile wiadomości nadejdzie, moglibyśmy to zrobić ręcznie, wywołując rx.recv().await cztery razy. Jednak w prawdziwym świecie zazwyczaj będziemy czekać na nieznaną liczbę wiadomości, więc musimy czekać, aż ustalimy, że nie ma już więcej wiadomości.

W Listingu 16-10 użyliśmy pętli for do przetwarzania wszystkich elementów otrzymanych z kanału synchronicznego. Rust nie ma jeszcze sposobu na użycie pętli for z asynchronicznie produkowanym strumieniem elementów, więc musimy użyć pętli, której wcześniej nie widzieliśmy: warunkowej pętli while let. Jest to wersja pętli konstrukcji if let, którą widzieliśmy w sekcji „Zwięzła kontrola przepływu z if let i let...else w Rozdziale 6. Pętla będzie wykonywać się tak długo, jak długo określony wzorzec będzie pasował do wartości.

Wywołanie rx.recv produkuje future, na którą czekamy. Środowisko uruchomieniowe wstrzyma future, dopóki nie będzie gotowa. Gdy tylko nadejdzie wiadomość, future rozstrzygnie się na Some(message) tyle razy, ile wiadomości nadejdzie. Gdy kanał zostanie zamknięty, niezależnie od tego, czy jakiekolwiek wiadomości nadeszły, future zamiast tego rozstrzygnie się na None, aby wskazać, że nie ma już więcej wartości i w związku z tym powinniśmy przestać odpytywać – to znaczy, przestać oczekiwać.

Pętla while let łączy to wszystko w całość. Jeśli wynik wywołania rx.recv().await to Some(message), uzyskujemy dostęp do wiadomości i możemy jej używać w ciele pętli, tak jak w przypadku if let. Jeśli wynik to None, pętla się kończy. Za każdym razem, gdy pętla się kończy, ponownie trafia na punkt oczekiwania, więc środowisko uruchomieniowe ponownie ją wstrzymuje, dopóki nie nadejdzie kolejna wiadomość.

Kod teraz pomyślnie wysyła i odbiera wszystkie wiadomości. Niestety, nadal istnieją dwa problemy. Po pierwsze, wiadomości nie docierają w odstępach półsekundowych. Docierają wszystkie naraz, 2 sekundy (2000 milisekund) po uruchomieniu programu. Po drugie, ten program również nigdy się nie kończy! Zamiast tego czeka w nieskończoność na nowe wiadomości. Musisz go wyłączyć, używając ctrl-C.

Kod w jednym bloku Async wykonuje się liniowo

Zacznijmy od zbadania, dlaczego wiadomości przychodzą wszystkie naraz po pełnym opóźnieniu, zamiast przychodzić z opóźnieniami między każdą z nich. W danym bloku async kolejność, w jakiej słowa kluczowe await pojawiają się w kodzie, jest również kolejnością, w jakiej są wykonywane, gdy program działa.

W Listingu 17-10 jest tylko jeden blok async, więc wszystko w nim działa liniowo. Nadal nie ma współbieżności. Wszystkie wywołania tx.send odbywają się, przeplatane wszystkimi wywołaniami trpl::sleep i ich powiązanymi punktami oczekiwania. Dopiero wtedy pętla while let może przejść przez którykolwiek z punktów oczekiwania na wywołania recv.

Aby uzyskać pożądane zachowanie, w którym opóźnienie snu następuje między każdą wiadomością, musimy umieścić operacje tx i rx w ich własnych blokach async, jak pokazano w Listingu 17-11. Wtedy środowisko uruchomieniowe może wykonać każdą z nich oddzielnie, używając funkcji trpl::join, podobnie jak w Listingu 17-8. Ponownie, czekamy na wynik wywołania trpl::join, a nie na poszczególne futures. Gdybyśmy czekali na poszczególne futures w sekwencji, skutkowałoby to powrotem do sekwencyjnego przepływu – dokładnie tego, czego próbujemy nie robić.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Dzięki zaktualizowanemu kodowi z Listingu 17-11, wiadomości są drukowane w odstępach 500 milisekund, a nie wszystkie naraz po 2 sekundach.

Przenoszenie własności do bloku async

Program nadal nigdy się nie kończy z powodu sposobu, w jaki pętla while let współdziała z trpl::join:

  • Future zwrócona przez trpl::join kończy się dopiero, gdy obie future, które zostały do niej przekazane, zakończą swoje działanie.
  • Future tx_fut kończy się, gdy zakończy spanie po wysłaniu ostatniej wiadomości w vals.
  • Future rx_fut nie zakończy się, dopóki pętla while let się nie zakończy.
  • Pętla while let nie zakończy się, dopóki oczekiwanie na rx.recv nie zwróci None.
  • Oczekiwanie na rx.recv zwróci None dopiero po zamknięciu drugiego końca kanału.
  • Kanał zostanie zamknięty tylko, jeśli wywołamy rx.close lub gdy strona nadawcy, tx, zostanie usunięta.
  • Nigdzie nie wywołujemy rx.close, a tx nie zostanie usunięte, dopóki najbardziej zewnętrzny blok async przekazany do trpl::block_on się nie zakończy.
  • Blok nie może się zakończyć, ponieważ jest zablokowany przez zakończenie trpl::join, co cofa nas na początek tej listy.

Obecnie blok async, w którym wysyłamy wiadomości, jedynie pożycza tx, ponieważ wysyłanie wiadomości nie wymaga własności, ale gdybyśmy mogli przenieść tx do tego bloku async, zostałoby ono usunięte po zakończeniu tego bloku. W sekcji „Przechwytywanie referencji lub przenoszenie własności” w Rozdziale 13 nauczyłeś się, jak używać słowa kluczowego move z domknięciami, a jak omówiono w sekcji „Używanie domknięć move z wątkami” w Rozdziale 16, często musimy przenosić dane do domknięć, pracując z wątkami. Te same podstawowe dynamiki dotyczą bloków async, więc słowo kluczowe move działa z blokami async tak samo, jak z domknięciami.

W Listingu 17-12 zmieniamy blok używany do wysyłania wiadomości z async na async move.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Po uruchomieniu tej wersji kodu, zamyka się ona płynnie po wysłaniu i odebraniu ostatniej wiadomości. Następnie, zobaczmy, co musiałoby się zmienić, aby wysyłać dane z więcej niż jednej future.

Łączenie wielu futures za pomocą makra join!

Ten kanał async jest również kanałem wieloproducentowym, więc możemy wywołać clone na tx, jeśli chcemy wysyłać wiadomości z wielu futures, jak pokazano w Listingu 17-13.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}

Najpierw klonujemy tx, tworząc tx1 poza pierwszym blokiem async. Przenosimy tx1 do tego bloku, tak jak wcześniej z tx. Następnie, później, przenosimy oryginalne tx do nowego bloku async, gdzie wysyłamy więcej wiadomości z nieco wolniejszym opóźnieniem. Akurat umieszczamy ten nowy blok async po bloku async do odbierania wiadomości, ale równie dobrze mógłby być przed nim. Kluczem jest kolejność, w jakiej futures są oczekiwane, a nie w jakiej są tworzone.

Oba bloki async do wysyłania wiadomości muszą być blokami async move, tak aby zarówno tx, jak i tx1 zostały usunięte po zakończeniu tych bloków. W przeciwnym razie wrócimy do tej samej nieskończonej pętli, w której zaczęliśmy.

Na koniec, przełączamy się z trpl::join na trpl::join!, aby obsłużyć dodatkowe future: makro join! oczekuje na dowolną liczbę futures, gdzie liczbę futures znamy w czasie kompilacji. O oczekiwaniu na kolekcję nieznanej liczby futures będziemy rozmawiać później w tym rozdziale.

Teraz widzimy wszystkie wiadomości z obu futures wysyłających, a ponieważ futures wysyłające używają nieco innych opóźnień po wysłaniu, wiadomości są również odbierane w tych różnych odstępach czasu:

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Możesz zobaczyć wartości w innej kolejności, w zależności od twojego systemu. To właśnie sprawia, że współbieżność jest interesująca, a także trudna. Jeśli poeksperymentujesz z thread::sleep, nadając mu różne wartości w różnych wątkach, każde uruchomienie będzie bardziej niedeterministyczne i za każdym razem będzie generować inne dane wyjściowe.

Teraz, gdy przyjrzeliśmy się, jak działają kanały, spójrzmy na inną metodę współbieżności.