Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Przekazywanie kontroli do środowiska wykonawczego

Przypomnijmy z sekcji „Nasz pierwszy program asynchroniczny”, że w każdym punkcie await Rust daje środowisku wykonawczemu szansę na wstrzymanie zadania i przełączenie się na inne, jeśli oczekiwany future nie jest gotowy. Odwrotność jest również prawdziwa: Rust tylko wstrzymuje bloki asynchroniczne i przekazuje kontrolę środowisku wykonawczemu w punkcie await. Wszystko pomiędzy punktami await jest synchroniczne.

Oznacza to, że jeśli wykonasz dużo pracy w bloku asynchronicznym bez punktu await, ten future zablokuje postęp innych futures. Czasami usłyszysz, że jeden future zagładza inne futures. W niektórych przypadkach może to nie być duży problem. Jednak jeśli wykonujesz jakąś kosztowną konfigurację lub długotrwałą pracę, albo jeśli masz future, które będzie wykonywać pewne zadanie w nieskończoność, będziesz musiał pomyśleć o tym, kiedy i gdzie przekazać kontrolę środowisku wykonawczemu.

Zasymulujmy długotrwałą operację, aby zilustrować problem zagładzania, a następnie zbadajmy, jak go rozwiązać. Listing 17-14 wprowadza funkcję slow.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Ten kod używa std::thread::sleep zamiast trpl::sleep, więc wywołanie slow zablokuje bieżący wątek na określoną liczbę milisekund. Możemy użyć slow do reprezentowania rzeczywistych operacji, które są zarówno długotrwałe, jak i blokujące.

W Listing 17-15 używamy slow do emulacji tego rodzaju pracy związanej z CPU w parze futures.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Każdy future przekazuje kontrolę środowisku wykonawczemu dopiero po wykonaniu szeregu wolnych operacji. Jeśli uruchomisz ten kod, zobaczysz następujący wynik:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

Podobnie jak w Listing 17-5, gdzie użyliśmy trpl::select do współzawodnictwa futures pobierających dwa adresy URL, select nadal kończy działanie, gdy tylko a zostanie zakończone. Nie ma jednak przeplatania między wywołaniami slow w dwóch futures. Future a wykonuje całą swoją pracę, dopóki nie zostanie oczekiwane wywołanie trpl::sleep, następnie future b wykonuje całą swoją pracę, dopóki nie zostanie oczekiwane jego własne wywołanie trpl::sleep, a na koniec future a zostaje zakończone. Aby umożliwić obu futures postęp między ich wolnymi zadaniami, potrzebujemy punktów await, abyśmy mogli przekazać kontrolę środowisku wykonawczemu. Oznacza to, że potrzebujemy czegoś, na co możemy czekać!

Już widzimy, jak tego rodzaju przekazywanie dzieje się w Listing 17-15: gdybyśmy usunęli trpl::sleep na końcu future a, zostałoby ono zakończone bez jakiegokolwiek uruchomienia future b. Spróbujmy użyć funkcji trpl::sleep jako punktu wyjścia do umożliwienia operacjom przełączania się w celu osiągania postępu, jak pokazano w Listing 17-16.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Dodaliśmy wywołania trpl::sleep z punktami await między każdym wywołaniem slow. Teraz praca obu futures jest przeplatana:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

Future a nadal działa przez chwilę, zanim przekaże kontrolę do b, ponieważ wywołuje slow przed wywołaniem trpl::sleep, ale potem futures zamieniają się miejscami za każdym razem, gdy jedna z nich osiąga punkt await. W tym przypadku zrobiliśmy to po każdym wywołaniu slow, ale mogliśmy podzielić pracę w sposób, który miałby dla nas największy sens.

Nie chcemy jednak tutaj faktycznie spać: chcemy postępować tak szybko, jak to możliwe. Po prostu musimy przekazać kontrolę środowisku wykonawczemu. Możemy to zrobić bezpośrednio, używając funkcji trpl::yield_now. W Listing 17-17 zastępujemy wszystkie te wywołania trpl::sleep wywołaniami trpl::yield_now.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Ten kod jest zarówno jaśniejszy pod względem rzeczywistego zamiaru, jak i może być znacznie szybszy niż użycie sleep, ponieważ timery takie jak ten używany przez sleep często mają ograniczenia co do tego, jak granularne mogą być. Na przykład, wersja sleep, której używamy, zawsze będzie spać przez co najmniej milisekundę, nawet jeśli przekażemy jej Duration o długości jednej nanosekundy. Ponownie, nowoczesne komputery są szybkie: mogą zrobić wiele w ciągu jednej milisekundy!

Oznacza to, że async może być użyteczne nawet dla zadań związanych z obliczeniami, w zależności od tego, co jeszcze robi twój program, ponieważ dostarcza przydatne narzędzie do strukturyzowania relacji między różnymi częściami programu (ale kosztem narzutu maszyny stanów async). Jest to forma wielozadaniowości kooperacyjnej, gdzie każdy future ma moc decydowania, kiedy przekaże kontrolę za pośrednictwem punktów await. Każdy future ponosi zatem również odpowiedzialność za unikanie zbyt długiego blokowania. W niektórych wbudowanych systemach operacyjnych opartych na Rust jest to jedyny rodzaj wielozadaniowości!

W rzeczywistym kodzie oczywiście nie będziesz na każdej linii przeplatać wywołań funkcji z punktami await. Chociaż przekazywanie kontroli w ten sposób jest stosunkowo niedrogie, nie jest darmowe. W wielu przypadkach próba podziału zadania intensywnie obciążającego CPU może znacznie je spowolnić, więc czasami dla ogólnej wydajności lepiej jest pozwolić operacji na krótkie zablokowanie. Zawsze mierz, aby zobaczyć, gdzie są rzeczywiste wąskie gardła wydajności twojego kodu. Ważne jest jednak, aby pamiętać o podstawowej dynamice, jeśli widzisz dużo pracy wykonywanej szeregowo, a spodziewałeś się, że będzie ona wykonywana równolegle!

Budowanie własnych abstrakcji asynchronicznych

Możemy również łączyć futures, aby tworzyć nowe wzorce. Na przykład możemy zbudować funkcję timeout z już posiadanych asynchronicznych bloków konstrukcyjnych. Kiedy skończymy, wynik będzie kolejnym blokiem konstrukcyjnym, którego moglibyśmy użyć do tworzenia jeszcze bardziej asynchronicznych abstrakcji.

Listing 17-18 pokazuje, jak ten timeout powinien działać z wolnym future.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

Zaimplementujmy to! Na początek pomyślmy o API dla timeout:

  • Musi to być funkcja asynchroniczna sama w sobie, abyśmy mogli na nią czekać (await).
  • Jej pierwszy parametr powinien być futurem do uruchomienia. Możemy uczynić go generycznym, aby mógł działać z dowolnym futurem.
  • Jej drugi parametr będzie maksymalnym czasem oczekiwania. Jeśli użyjemy Duration, ułatwi to przekazanie do trpl::sleep.
  • Powinna zwracać Result. Jeśli future zakończy się pomyślnie, Result będzie Ok z wartością wyprodukowaną przez future. Jeśli limit czasu upłynie wcześniej, Result będzie Err z czasem trwania, na który timeout czekał.

Listing 17-19 pokazuje tę deklarację.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}

To spełnia nasze cele dotyczące typów. Teraz pomyślmy o zachowaniu, którego potrzebujemy: chcemy, aby future przekazane w tle konkurowało z czasem trwania. Możemy użyć trpl::sleep, aby utworzyć future timera z czasu trwania, i użyć trpl::select, aby uruchomić ten timer z futurem przekazanym przez wywołującego.

W Listing 17-20 implementujemy timeout poprzez dopasowanie do wyniku oczekiwania (await) na trpl::select.

extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::select(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}

Implementacja trpl::select nie jest sprawiedliwa: zawsze odpytuje argumenty w kolejności, w jakiej zostały przekazane (inne implementacje select losowo wybierają, który argument odpytać jako pierwszy). W ten sposób przekazujemy future_to_try do select jako pierwszy, aby miał szansę na ukończenie, nawet jeśli max_time jest bardzo krótkim czasem trwania. Jeśli future_to_try zakończy się jako pierwszy, select zwróci Left z wynikiem z future_to_try. Jeśli timer zakończy się jako pierwszy, select zwróci Right z wynikiem timera ().

Jeśli future_to_try zakończy się sukcesem i otrzymamy Left(output), zwracamy Ok(output). Jeśli zamiast tego upłynie czas timera uśpienia i otrzymamy Right(()), ignorujemy () za pomocą _ i zamiast tego zwracamy Err(max_time).

Dzięki temu mamy działającą funkcję timeout zbudowaną z dwóch innych asynchronicznych pomocników. Jeśli uruchomimy nasz kod, wydrukuje on tryb awarii po upływie limitu czasu:

Failed after 2 seconds

Ponieważ futures komponują się z innymi futures, możesz budować naprawdę potężne narzędzia, używając mniejszych asynchronicznych bloków konstrukcyjnych. Na przykład, możesz użyć tego samego podejścia do łączenia limitów czasu z ponownymi próbami, a z kolei używać ich z operacjami takimi jak wywołania sieciowe (takie jak te w Listing 17-5).

W praktyce zazwyczaj będziesz pracować bezpośrednio z async i await, a w drugiej kolejności z funkcjami takimi jak select i makrami takimi jak makro join! do kontrolowania sposobu wykonywania zewnętrznych futures.

Widzieliśmy już wiele sposobów pracy z wieloma futures jednocześnie. Następnie przyjrzymy się, jak możemy pracować z wieloma futures w sekwencji w czasie za pomocą strumieni.