Strumienie: Futures w sekwencji
Przypomnijmy, jak używaliśmy odbiornika dla naszego asynchronicznego kanału wcześniej w tym rozdziale w sekcji „Przekazywanie wiadomości”. Asynchroniczna metoda recv wytwarza sekwencję elementów w czasie. Jest to instancja znacznie bardziej ogólnego wzorca znanego jako strumień. Wiele koncepcji jest naturalnie reprezentowanych jako strumienie: elementy stają się dostępne w kolejce, fragmenty danych są pobierane przyrostowo z systemu plików, gdy pełny zestaw danych jest zbyt duży dla pamięci komputera, lub dane przychodzą przez sieć w czasie. Ponieważ strumienie są futures, możemy ich używać z dowolnym innym rodzajem future i łączyć je w interesujące sposoby. Na przykład, możemy grupować zdarzenia, aby uniknąć wywoływania zbyt wielu wywołań sieciowych, ustawiać limity czasu na sekwencje długotrwałych operacji lub ograniczać zdarzenia interfejsu użytkownika, aby uniknąć wykonywania zbędnej pracy.
Widzieliśmy sekwencję elementów w Rozdziale 13, kiedy przyglądaliśmy się cechom Iterator w sekcji „Cechy Iterator i metoda next”, ale istnieją dwie różnice między iteratorami a asynchronicznym odbiornikiem kanału. Pierwsza różnica to czas: iteratory są synchroniczne, podczas gdy odbiornik kanału jest asynchroniczny. Druga różnica to API. Pracując bezpośrednio z Iterator, wywołujemy jego synchroniczną metodę next. W przypadku strumienia trpl::Receiver w szczególności, zamiast tego wywołaliśmy asynchroniczną metodę recv. Poza tym te API są bardzo podobne, a to podobieństwo nie jest przypadkowe. Strumień jest jak asynchroniczna forma iteracji. Podczas gdy trpl::Receiver w szczególności czeka na odebranie wiadomości, ogólne API strumienia jest znacznie szersze: dostarcza następny element w taki sam sposób jak Iterator, ale asynchronicznie.
Podobieństwo między iteratorami a strumieniami w Rust oznacza, że faktycznie możemy stworzyć strumień z dowolnego iteratora. Podobnie jak w przypadku iteratora, możemy pracować ze strumieniem, wywołując jego metodę next, a następnie oczekując na wynik, jak w Listing 17-21, który jeszcze się nie skompiluje.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Zaczynamy od tablicy liczb, którą przekształcamy w iterator, a następnie wywołujemy map, aby podwoić wszystkie wartości. Następnie przekształcamy iterator w strumień za pomocą funkcji trpl::stream_from_iter. Dalej pętlujemy po elementach w strumieniu, gdy te docierają, za pomocą pętli while let.
Niestety, kiedy próbujemy uruchomić ten kod, nie kompiluje się on, lecz zgłasza brak metody next:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Jak wyjaśnia ten wynik, przyczyną błędu kompilacji jest to, że do użycia metody next potrzebujemy odpowiedniej cechy w zasięgu. Biorąc pod uwagę naszą dotychczasową dyskusję, można by rozsądnie oczekiwać, że będzie to cecha Stream, ale w rzeczywistości jest to StreamExt. Skrót od extension (rozszerzenie), Ext to powszechny wzorzec w społeczności Rust służący do rozszerzania jednej cechy inną.
Cecha Stream definiuje niskopoziomowy interfejs, który skutecznie łączy cechy Iterator i Future. StreamExt dostarcza wyższopoziomowy zestaw API ponad Stream, w tym metodę next, a także inne metody narzędziowe podobne do tych dostarczanych przez cechę Iterator. Stream i StreamExt nie są jeszcze częścią standardowej biblioteki Rust, ale większość składowych ekosystemu używa podobnych definicji.
Naprawą błędu kompilatora jest dodanie instrukcji use dla trpl::StreamExt, jak w Listing 17-22.
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Po połączeniu wszystkich tych elementów ten kod działa tak, jak chcemy! Co więcej, teraz, gdy mamy StreamExt w zasięgu, możemy używać wszystkich jego metod narzędziowych, tak samo jak w przypadku iteratorów.