Stosowanie współbieżności z async
W tej sekcji zastosujemy async do niektórych z tych samych wyzwań współbieżności, które rozwiązaliśmy za pomocą wątków w Rozdziale 16. Ponieważ wiele kluczowych idei zostało tam już omówionych, w tej sekcji skupimy się na różnicach między wątkami a futures.
W wielu przypadkach API do pracy z współbieżnością przy użyciu async są bardzo podobne do tych do pracy z wątkami. W innych przypadkach są one zupełnie różne. Nawet gdy API wyglądają podobnie między wątkami a async, często mają różne zachowania — i prawie zawsze mają różne charakterystyki wydajności.
Tworzenie nowego zadania za pomocą spawn_task
Pierwszą operacją, którą zajęliśmy się w sekcji „Tworzenie nowego wątku za
pomocą spawn” w Rozdziale 16, było zliczanie
na dwóch oddzielnych wątkach. Zróbmy to samo, używając async. Crate trpl
dostarcza funkcję spawn_task, która wygląda bardzo podobnie do API
thread::spawn, oraz funkcję sleep, która jest asynchroniczną wersją API
thread::sleep. Możemy ich użyć razem do zaimplementowania przykładu
zliczania, jak pokazano w Listingu 17-6.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
Jako punkt wyjścia, konfigurujemy naszą funkcję main z trpl::block_on,
tak aby nasza funkcja najwyższego poziomu mogła być asynchroniczna.
Uwaga: Od tego momentu w rozdziale, każdy przykład będzie zawierał ten sam kod opakowujący z
trpl::block_onwmain, więc często będziemy go pomijać, tak jak to robimy zmain. Pamiętaj, aby uwzględnić go w swoim kodzie!
Następnie piszemy dwie pętle w tym bloku, każda zawierająca wywołanie
trpl::sleep, które czeka pół sekundy (500 milisekund) przed wysłaniem
kolejnej wiadomości. Jedną pętlę umieszczamy w ciele trpl::spawn_task, a drugą
w pętli for najwyższego poziomu. Dodajemy również await po wywołaniach
sleep.
Ten kod zachowuje się podobnie do implementacji opartej na wątkach – w tym także to, że możesz zobaczyć wiadomości pojawiające się w innej kolejności w własnym terminalu, gdy go uruchomisz:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
Ta wersja zatrzymuje się, gdy tylko zakończy się pętla for w ciele głównego
bloku async, ponieważ zadanie uruchomione przez spawn_task zostaje
wyłączone, gdy funkcja main się kończy. Jeśli chcesz, aby działało do końca
zadania, będziesz musiał użyć uchwytu join do oczekiwania na zakończenie
pierwszego zadania. W przypadku wątków, użyliśmy metody join do „blokowania”,
dopóki wątek nie zakończył działania. W Listingu 17-7 możemy użyć await do
zrobienia tego samego, ponieważ sam uchwyt zadania jest future. Jego typ
Output to Result, więc również go rozpakowujemy po oczekiwaniu na niego.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
});
}
Ta zaktualizowana wersja działa, dopóki obie pętle się nie zakończą:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Jak dotąd, wydaje się, że async i wątki dają nam podobne wyniki, tylko z
inną składnią: używając await zamiast wywoływania join na uchwycie join,
oraz oczekując na wywołania sleep.
Większą różnicą jest to, że nie musieliśmy tworzyć kolejnego wątku systemu
operacyjnego, aby to zrobić. W rzeczywistości, nie musimy nawet tworzyć tutaj
żadnego zadania. Ponieważ bloki async kompilują się do anonimowych futures,
możemy umieścić każdą pętlę w bloku async i pozwolić środowisku uruchomieniowemu
uruchomić je obie do końca za pomocą funkcji trpl::join.
W sekcji „Czekanie na zakończenie wszystkich wątków” w Rozdziale 16 pokazaliśmy, jak używać metody join na typie
JoinHandle zwracanym po wywołaniu std::thread::spawn. Funkcja trpl::join
jest podobna, ale dla futures. Kiedy podasz jej dwie futures, produkuje jedną
nową future, której wynikiem jest krotka zawierająca wyniki każdej future,
którą przekazałeś, gdy obie zakończą działanie. Tak więc, w Listingu 17-8
używamy trpl::join, aby poczekać na zakończenie zarówno fut1, jak i fut2.
Nie oczekujemy na fut1 i fut2, ale zamiast tego na nową future
produkowaną przez trpl::join. Ignorujemy wynik, ponieważ jest to tylko krotka
zawierająca dwie wartości jednostkowe.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let fut1 = async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
});
}
Po uruchomieniu widzimy, że obie futures działają do końca:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Teraz zobaczysz dokładnie tę samą kolejność za każdym razem, co jest bardzo
różne od tego, co widzieliśmy w przypadku wątków i trpl::spawn_task w Listingu
17-7. Dzieje się tak, ponieważ funkcja trpl::join jest sprawiedliwa,
oznacza to, że sprawdza każdą future tak samo często, naprzemiennie między nimi,
i nigdy nie pozwala jednej wyprzedzić drugiej, jeśli ta druga jest gotowa.
W przypadku wątków, system operacyjny decyduje, który wątek sprawdzić i jak
długo pozwolić mu działać. W przypadku async Rust, środowisko uruchomieniowe
decyduje, które zadanie sprawdzić. (W praktyce, szczegóły stają się
skomplikowane, ponieważ środowisko uruchomieniowe async może wykorzystywać
wątki systemu operacyjnego pod maską jako część sposobu zarządzania
współbieżnością, więc zagwarantowanie sprawiedliwości może być bardziej
pracochłonne dla środowiska uruchomieniowego – ale nadal jest to możliwe!)
Środowiska uruchomieniowe nie muszą gwarantować sprawiedliwości dla żadnej
danej operacji i często oferują różne API, aby umożliwić wybór, czy chcesz
sprawiedliwości, czy nie.
Wypróbuj niektóre z tych wariacji oczekiwania na futures i zobacz, co robią:
- Usuń blok async wokół jednej lub obu pętli.
- Oczekuj na każdy blok async natychmiast po jego zdefiniowaniu.
- Opakuj tylko pierwszą pętlę w blok async i oczekuj na wynikową future po ciele drugiej pętli.
Dodatkowym wyzwaniem jest sprawdzenie, czy potrafisz przewidzieć, jaki będzie wynik w każdym przypadku przed uruchomieniem kodu!
Przesyłanie danych między dwoma zadaniami za pomocą przekazywania wiadomości
Współdzielenie danych między futures również będzie znajome: ponownie użyjemy przekazywania wiadomości, ale tym razem z asynchronicznymi wersjami typów i funkcji. Obierzemy nieco inną ścieżkę niż w sekcji „Przesyłanie danych między wątkami za pomocą przekazywania wiadomości” w Rozdziale 16, aby zilustrować niektóre kluczowe różnice między współbieżnością opartą na wątkach a współbieżnością opartą na futures. W Listingu 17-9 zaczniemy od pojedynczego bloku async – nie uruchamiając oddzielnego zadania, tak jak uruchomiliśmy oddzielny wątek.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let val = String::from("hi");
tx.send(val).unwrap();
let received = rx.recv().await.unwrap();
println!("received '{received}'");
});
}
Tutaj używamy trpl::channel, asynchronicznej wersji API kanału typu
wiele-producentów, jeden-konsument, którego użyliśmy z wątkami w Rozdziale 16.
Asynchroniczna wersja API różni się tylko nieco od wersji opartej na wątkach:
używa mutowalnego, a nie niemutowalnego odbiornika rx, a jego metoda recv
produkuje future, na którą musimy czekać, zamiast bezpośrednio produkować
wartość. Teraz możemy wysyłać wiadomości z nadawcy do odbiornika. Zauważ, że
nie musimy uruchamiać oddzielnego wątku ani nawet zadania; wystarczy, że
będziemy czekać na wywołanie rx.recv.
Synchroniczna metoda Receiver::recv w std::mpsc::channel blokuje do czasu
otrzymania wiadomości. Metoda trpl::Receiver::recv tego nie robi, ponieważ
jest asynchroniczna. Zamiast blokować, przekazuje kontrolę z powrotem do
środowiska uruchomieniowego, dopóki nie zostanie odebrana wiadomość lub strona
wysyłająca kanału nie zostanie zamknięta. Natomiast nie czekamy na wywołanie
send, ponieważ nie blokuje. Nie musi, ponieważ kanał, do którego wysyłamy,
jest nieograniczony.
Uwaga: Ponieważ cały ten kod async działa w bloku async w wywołaniu
trpl::block_on, wszystko w nim może uniknąć blokowania. Jednak kod poza im blokiem będzie blokował, dopóki funkcjablock_onnie zwróci wartości. Na tym polega cała idea funkcjitrpl::block_on: pozwala wybrać, gdzie zablokować na jakimś zestawie kodu async, a tym samym, gdzie przejść między kodem synchronicznym a asynchronicznym.
Zauważ dwie rzeczy dotyczące tego przykładu. Po pierwsze, wiadomość dotrze natychmiast. Po drugie, choć używamy tutaj przyszłości, nie ma jeszcze współbieżności. Wszystko w listingu dzieje się sekwencyjnie, tak jakby nie było żadnych futures.
Zajmijmy się pierwszą częścią, wysyłając serię wiadomości i pauzując między nimi, jak pokazano w Listingu 17-10.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
Oprócz wysyłania wiadomości, musimy je odbierać. W tym przypadku, ponieważ
wiemy, ile wiadomości nadejdzie, moglibyśmy to zrobić ręcznie, wywołując
rx.recv().await cztery razy. Jednak w prawdziwym świecie zazwyczaj będziemy
czekać na nieznaną liczbę wiadomości, więc musimy czekać, aż ustalimy, że
nie ma już więcej wiadomości.
W Listingu 16-10 użyliśmy pętli for do przetwarzania wszystkich elementów
otrzymanych z kanału synchronicznego. Rust nie ma jeszcze sposobu na użycie
pętli for z asynchronicznie produkowanym strumieniem elementów, więc musimy
użyć pętli, której wcześniej nie widzieliśmy: warunkowej pętli while let.
Jest to wersja pętli konstrukcji if let, którą widzieliśmy w sekcji „Zwięzła
kontrola przepływu z if let i let...else” w
Rozdziale 6. Pętla będzie wykonywać się tak długo, jak długo określony wzorzec
będzie pasował do wartości.
Wywołanie rx.recv produkuje future, na którą czekamy. Środowisko uruchomieniowe
wstrzyma future, dopóki nie będzie gotowa. Gdy tylko nadejdzie wiadomość,
future rozstrzygnie się na Some(message) tyle razy, ile wiadomości nadejdzie.
Gdy kanał zostanie zamknięty, niezależnie od tego, czy jakiekolwiek wiadomości
nadeszły, future zamiast tego rozstrzygnie się na None, aby wskazać, że nie
ma już więcej wartości i w związku z tym powinniśmy przestać odpytywać – to
znaczy, przestać oczekiwać.
Pętla while let łączy to wszystko w całość. Jeśli wynik wywołania
rx.recv().await to Some(message), uzyskujemy dostęp do wiadomości i możemy
jej używać w ciele pętli, tak jak w przypadku if let. Jeśli wynik to None,
pętla się kończy. Za każdym razem, gdy pętla się kończy, ponownie trafia na
punkt oczekiwania, więc środowisko uruchomieniowe ponownie ją wstrzymuje,
dopóki nie nadejdzie kolejna wiadomość.
Kod teraz pomyślnie wysyła i odbiera wszystkie wiadomości. Niestety, nadal istnieją dwa problemy. Po pierwsze, wiadomości nie docierają w odstępach półsekundowych. Docierają wszystkie naraz, 2 sekundy (2000 milisekund) po uruchomieniu programu. Po drugie, ten program również nigdy się nie kończy! Zamiast tego czeka w nieskończoność na nowe wiadomości. Musisz go wyłączyć, używając ctrl-C.
Kod w jednym bloku Async wykonuje się liniowo
Zacznijmy od zbadania, dlaczego wiadomości przychodzą wszystkie naraz po pełnym
opóźnieniu, zamiast przychodzić z opóźnieniami między każdą z nich. W danym
bloku async kolejność, w jakiej słowa kluczowe await pojawiają się w kodzie,
jest również kolejnością, w jakiej są wykonywane, gdy program działa.
W Listingu 17-10 jest tylko jeden blok async, więc wszystko w nim działa
liniowo. Nadal nie ma współbieżności. Wszystkie wywołania tx.send odbywają się,
przeplatane wszystkimi wywołaniami trpl::sleep i ich powiązanymi punktami
oczekiwania. Dopiero wtedy pętla while let może przejść przez którykolwiek z
punktów oczekiwania na wywołania recv.
Aby uzyskać pożądane zachowanie, w którym opóźnienie snu następuje między
każdą wiadomością, musimy umieścić operacje tx i rx w ich własnych blokach
async, jak pokazano w Listingu 17-11. Wtedy środowisko uruchomieniowe może
wykonać każdą z nich oddzielnie, używając funkcji trpl::join, podobnie jak
w Listingu 17-8. Ponownie, czekamy na wynik wywołania trpl::join, a nie na
poszczególne futures. Gdybyśmy czekali na poszczególne futures w sekwencji,
skutkowałoby to powrotem do sekwencyjnego przepływu – dokładnie tego, czego
próbujemy nie robić.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
Dzięki zaktualizowanemu kodowi z Listingu 17-11, wiadomości są drukowane w odstępach 500 milisekund, a nie wszystkie naraz po 2 sekundach.
Przenoszenie własności do bloku async
Program nadal nigdy się nie kończy z powodu sposobu, w jaki pętla while let
współdziała z trpl::join:
- Future zwrócona przez
trpl::joinkończy się dopiero, gdy obie future, które zostały do niej przekazane, zakończą swoje działanie. - Future
tx_futkończy się, gdy zakończy spanie po wysłaniu ostatniej wiadomości wvals. - Future
rx_futnie zakończy się, dopóki pętlawhile letsię nie zakończy. - Pętla
while letnie zakończy się, dopóki oczekiwanie narx.recvnie zwróciNone. - Oczekiwanie na
rx.recvzwróciNonedopiero po zamknięciu drugiego końca kanału. - Kanał zostanie zamknięty tylko, jeśli wywołamy
rx.closelub gdy strona nadawcy,tx, zostanie usunięta. - Nigdzie nie wywołujemy
rx.close, atxnie zostanie usunięte, dopóki najbardziej zewnętrzny blok async przekazany dotrpl::block_onsię nie zakończy. - Blok nie może się zakończyć, ponieważ jest zablokowany przez zakończenie
trpl::join, co cofa nas na początek tej listy.
Obecnie blok async, w którym wysyłamy wiadomości, jedynie pożycza tx,
ponieważ wysyłanie wiadomości nie wymaga własności, ale gdybyśmy mogli
przenieść tx do tego bloku async, zostałoby ono usunięte po zakończeniu
tego bloku. W sekcji „Przechwytywanie referencji lub przenoszenie
własności” w Rozdziale 13 nauczyłeś się, jak
używać słowa kluczowego move z domknięciami, a jak omówiono w sekcji
„Używanie domknięć move z wątkami” w Rozdziale
16, często musimy przenosić dane do domknięć, pracując z wątkami. Te same
podstawowe dynamiki dotyczą bloków async, więc słowo kluczowe move działa z
blokami async tak samo, jak z domknięciami.
W Listingu 17-12 zmieniamy blok używany do wysyłania wiadomości z async na
async move.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
Po uruchomieniu tej wersji kodu, zamyka się ona płynnie po wysłaniu i odebraniu ostatniej wiadomości. Następnie, zobaczmy, co musiałoby się zmienić, aby wysyłać dane z więcej niż jednej future.
Łączenie wielu futures za pomocą makra join!
Ten kanał async jest również kanałem wieloproducentowym, więc możemy wywołać
clone na tx, jeśli chcemy wysyłać wiadomości z wielu futures, jak pokazano
w Listingu 17-13.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join!(tx1_fut, tx_fut, rx_fut);
});
}
Najpierw klonujemy tx, tworząc tx1 poza pierwszym blokiem async. Przenosimy
tx1 do tego bloku, tak jak wcześniej z tx. Następnie, później, przenosimy
oryginalne tx do nowego bloku async, gdzie wysyłamy więcej wiadomości z
nieco wolniejszym opóźnieniem. Akurat umieszczamy ten nowy blok async po bloku
async do odbierania wiadomości, ale równie dobrze mógłby być przed nim. Kluczem
jest kolejność, w jakiej futures są oczekiwane, a nie w jakiej są tworzone.
Oba bloki async do wysyłania wiadomości muszą być blokami async move,
tak aby zarówno tx, jak i tx1 zostały usunięte po zakończeniu tych bloków.
W przeciwnym razie wrócimy do tej samej nieskończonej pętli, w której zaczęliśmy.
Na koniec, przełączamy się z trpl::join na trpl::join!, aby obsłużyć
dodatkowe future: makro join! oczekuje na dowolną liczbę futures, gdzie
liczbę futures znamy w czasie kompilacji. O oczekiwaniu na kolekcję nieznanej
liczby futures będziemy rozmawiać później w tym rozdziale.
Teraz widzimy wszystkie wiadomości z obu futures wysyłających, a ponieważ futures wysyłające używają nieco innych opóźnień po wysłaniu, wiadomości są również odbierane w tych różnych odstępach czasu:
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
Możesz zobaczyć wartości w innej kolejności, w zależności od twojego systemu.
To właśnie sprawia, że współbieżność jest interesująca, a także trudna. Jeśli
poeksperymentujesz z thread::sleep, nadając mu różne wartości w różnych
wątkach, każde uruchomienie będzie bardziej niedeterministyczne i za każdym
razem będzie generować inne dane wyjściowe.
Teraz, gdy przyjrzeliśmy się, jak działają kanały, spójrzmy na inną metodę współbieżności.