Async/.await

15 декември 2022

Административни неща

Паралелизъм и concurrency

Паралелизъм и concurrency

Паралелизъм и concurrency

Паралелизъм и concurrency

Двете понятия са ортогонални:

Приложение

Приложение

Приложение

Защо?

Наивна имплементация на уеб сървър би изглеждала подобно.
Но тази имплементация има множество проблеми.

1 2 3 4 5 6 7 8
// псевдокод
for stream in listener.accept() {
    thread::spawn(move || {
        let request = stream.read();
        let response = process(request);
        stream.write(response);
    });
}

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Защо?

Как?

Как?

Как?

Как?

Как?

Как?

Как?

Как?

Как?

Как?

Как?

Как?

Имплементации

Event driven (с callback функции)

Имплементации

Event driven (с callback функции)

Имплементации

Event driven (с callback функции)

Имплементации

Event driven (с callback функции)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
const http = require('http');
const server = http.createServer();

server
    .on('request', (request, response) => {
        let body = [];
        request
            .on('data', (chunk) => {
                body.push(chunk);
            })
            .on('end', () => {
                body = Buffer.concat(body).toString();
                console.log('Body: ', body);
                response.end();
            });
    })
    .listen(8080);

Имплементации

Event driven (с callback функции)

Недостатъци:

Имплементации

Coroutines

Имплементации

Coroutines

Имплементации

Coroutines

Асинхронно програмиране в Rust

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    println!("Listening on: {}", addr);

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                let n = socket
                    .read(&mut buf)
                    .await
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                socket
                    .write_all(&buf[0..n])
                    .await
                    .expect("failed to write data to socket");
            }
        });
    }
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box> {
    let addr = "127.0.0.1:8080".to_string();
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    println!("Listening on: {}", addr);

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                let n = socket
                    .read(&mut buf)
                    .await
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                socket
                    .write_all(&buf[0..n])
                    .await
                    .expect("failed to write data to socket");
            }
        });
    }
}

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Асинхронно програмиране в Rust

Библиотечни функции:

Изпълними програми:

Async/.await в Rust

async функции

1 2 3 4
// връща анонимен тип, който имплементира trait-а `Future<Output = u8>`
async fn five() -> u8 {
    5
}
#![allow(dead_code)]
// връща анонимен тип, който имплементира trait-а `Future`
async fn five() -> u8 {
    5
}
fn main() {}

Async/.await в Rust

async блокове

1 2 3 4 5 6 7 8 9
use std::future::Future;

fn ten() -> impl Future<Output = u8> {
    // връща анонимен тип, който имплементира trait-а `Future<Output = u8>`
    async {
        let x: u8 = five().await;
        x + 5
    }
}
#![allow(dead_code)]
use std::future::Future;

fn ten() -> impl Future {
    // връща анонимен тип, който имплементира trait-а `Future`
    async {
        let x: u8 = five().await;
        x + 5
    }
}
async fn five() -> u8 { 5 }
fn main() {}

Async/.await в Rust

.await

1 2 3 4 5 6 7 8 9 10 11
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ten().await;
}
error[E0728]: `await` is only allowed inside `async` functions and blocks --> src/bin/main_c247aff303b227bd4f1fa2405d3f7ad357520e11.rs:10:18 | 9 | fn main() { | ---- this is not `async` 10 | let x = ten().await; | ^^^^^^ only allowed inside `async` functions and blocks For more information about this error, try `rustc --explain E0728`. error: could not compile `rust` due to previous error
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ten().await;
}

Async/.await в Rust

.await

1 2 3 4 5 6 7 8 9 10 11 12 13
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ::futures::executor::block_on(async {
        ten().await
    });
}
async fn five() -> u8 {
    5
}

async fn ten() -> u8 {
    five().await + 5
}

fn main() {
    let x = ::futures::executor::block_on(async {
        ten().await
    });
}

Trait Future

1 2 3 4 5 6 7 8 9 10
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
use std::task::Context;
use std::pin::Pin;

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}

pub enum Poll {
    Ready(T),
    Pending,
}

fn main() {}

Trait Future

1 2 3 4 5
async fn foo() {
    println!("foo");
}

let foo_future = foo();
#![allow(unused_variables)]
fn main() {
async fn foo() {
    println!("foo");
}

let foo_future = foo();
}

Изпълнение на future

Future може да се изпълни

Изпълнение на future

Async функцията връща анонимна структура, която имплементира трейта Future.
Тази структура е enum, който съдържа всички възможни междинни състояния.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
fn main() {
    let fut1 = async_func1();
    ::futures::executor::block_on(fut1);
}

async fn async_func1() -> i32 {
    let fut2 = another_async_func();
    let x = fut2.await;

    let y = regular_func();

    let fut3 = make_call_to_db();
    let z = fut3.await;

    x + y + z
}

fn main() {
    let fut1 = async_func1();
    ::futures::executor::block_on(fut1);
}

async fn async_func1() -> i32 {
    let fut2 = another_async_func();
    let x = fut2.await;

    let y = regular_func();

    let fut3 = make_call_to_db();
    let z = fut3.await;

    x + y + z
}
async fn another_async_func() -> i32 { 0 }
async fn make_call_to_db() -> i32 { 0 }
fn regular_func() -> i32 { 0 }

1 2 3 4 5 6
enum Fut1 {
    Init,
    AtAwait1 { fut2: Fut2 },
    AtAwait2 { x: i32, y: i32, fut3: Fut3 },
    Done,
}

Изпълнение на future

Futures извършват прогрес, когато някой executor им извика poll

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
enum Fut1 {
    Init,
    AtAwait1 { fut2: Fut2 },
    AtAwait2 { x: i32, y: i32, fut3: Fut3 },
    Done,
}

impl Future for Fut1 {
    type Output = i32;

    // Една възможна имплементация
    // Все още игнорираме `Pin` и `Context`
    fn poll(self: &mut Self) -> Poll<Self::Output> {
        loop {

            match std::mem::replace(self, Fut1::Done) {
                Fut1::Init => {
                    let fut2 = another_async_func();
                    *self = Fut1::AtAwait1 { fut2 };
                }
                Fut1::AtAwait1 { mut fut2 } => {
                    match fut2.poll() {
                        Poll::Ready(res) => {
                            let x = res;
                            let y = regular_function();
                            let fut3 = make_call_to_db();

                            *self = Fut1::AtAwait2 { x, y, fut3 };
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait1 { fut2 };
                            return Poll::Pending;
                        }
                    }
                },
                Fut1::AtAwait2 { x, y, mut fut3 } => {
                    match fut3.poll() {
                        Poll::Ready(res) => {
                            let z = res;
                            return Poll::Ready(x + y + z);
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait2 { x, y, fut3 };
                            return Poll::Pending;
                        }
                    }
                }
                Fut1::Done => {
                    panic!("`poll` called on a finished future");
                }
            }

        }
    }
}
trait Future {
type Output;
fn poll(self: &mut Self) -> Poll;
}
enum Poll { Ready(T), Pending }

enum Fut1 {
    Init,
    AtAwait1 { fut2: Fut2 },
    AtAwait2 { x: i32, y: i32, fut3: Fut3 },
    Done,
}
struct Fut2;
impl Future for Fut2 { type Output = i32; fn poll(&mut self) -> Poll { todo!(); } }
struct Fut3;
impl Future for Fut3 { type Output = i32; fn poll(&mut self) -> Poll { todo!(); } }
fn regular_function() -> i32 { 0 }
fn another_async_func() -> Fut2 { todo!() }
fn make_call_to_db() -> Fut3 { todo!() }

impl Future for Fut1 {
    type Output = i32;

    // Една възможна имплементация
    // Все още игнорираме `Pin` и `Context`
    fn poll(self: &mut Self) -> Poll {
        loop {

            match std::mem::replace(self, Fut1::Done) {
                Fut1::Init => {
                    let fut2 = another_async_func();
                    *self = Fut1::AtAwait1 { fut2 };
                }
                Fut1::AtAwait1 { mut fut2 } => {
                    match fut2.poll() {
                        Poll::Ready(res) => {
                            let x = res;
                            let y = regular_function();
                            let fut3 = make_call_to_db();

                            *self = Fut1::AtAwait2 { x, y, fut3 };
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait1 { fut2 };
                            return Poll::Pending;
                        }
                    }
                },
                Fut1::AtAwait2 { x, y, mut fut3 } => {
                    match fut3.poll() {
                        Poll::Ready(res) => {
                            let z = res;
                            return Poll::Ready(x + y + z);
                        },
                        Poll::Pending => {
                            *self = Fut1::AtAwait2 { x, y, fut3 };
                            return Poll::Pending;
                        }
                    }
                }
                Fut1::Done => {
                    panic!("`poll` called on a finished future");
                }
            }

        }
    }
}
fn main() {}

Executors

Executors

Executors

Tokio

Tokio имплементира цялата машинария за изпълнение на future-и:

Tokio

Нормално главната функция на програмата не може да е async.
Tokio предоставя макрос, който скрива това

1 2 3
#[tokio::main]
async fn main() {
}
#[tokio::main]
async fn main() {
}

Tokio

Не е нужно да се използва макроса, същото може да се постигне и на ръка.
Този начин също така позволява настройване на библиотеката

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
use tokio::runtime::Runtime;

fn main() {
    let runtime = Runtime::new().unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /* ... */
}
use tokio::runtime::Runtime;

fn main() {
    let runtime = Runtime::new().unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box> {
    /* ... */
Ok(())
}

Tokio

По подразбиране tokio стартира многонишков runtime.
Това в много случаи не е необходимо - tokio ни дава concurrency, много често не ни е нужен и паралелизъм. В такива случаи можем да настроим tokio да използва еднонишков runtime.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_current_thread()
        .build()
        .unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /* ... */
}
use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_current_thread()
        .build()
        .unwrap();

    match runtime.block_on(run()) {
        Ok(()) => (),
        Err(e) => {
            eprintln!("An error occurred: {}", e);
            std::process::exit(1);
        }
    }
}

async fn run() -> Result<(), Box> {
    /* ... */
Ok(())
}

Executors

Executors

Executors

Executors

Executors

Executors

Съвместимост

Executors

Съвместимост

Executors

Съвместимост

Executors

Съвместимост

Futures екосистемата

Futures екосистемата

Futures екосистемата

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Блокиращи операции

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Примитиви за синхронизация

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Тежки операции

Подводни камъни

Задържане на променливи между await състояния

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    let val2 = other_async_op().await;

    val1 + val2

    // Деструктора на `big_vec` се извиква тука, в края на scope-а.
    // Това значи че променливата трябва да се пази жива през цялото време
    // докато future-а е жив. Ако имаме хиляди копия на този future - това е
    // ненужно заемане на памет
}
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    let val2 = other_async_op().await;

    val1 + val2

    // Деструктора на `big_vec` се извиква тука, в края на scope-а.
    // Това значи че променливата трябва да се пази жива през цялото време
    // докато future-а е жив. Ако имаме хиляди копия на този future - това е
    // ненужно заемане на памет
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}

Подводни камъни

Задържане на променливи между await състояния

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    // Ръчно деструктираме променливата.
    // Така тя няма да се пази в състоянието на future-а при
    // следващите await точки.
    drop(big_vec);

    let val2 = other_async_op().await;

    val1 + val2
}
async fn some_function() -> i32 {
    let big_vec = vec![ /* ... */ ];

    // Променливата `big_vec` се използва за последно тука
    let val1 = some_async_op(&big_vec).await;

    // Ръчно деструктираме променливата.
    // Така тя няма да се пази в състоянието на future-а при
    // следващите await точки.
    drop(big_vec);

    let val2 = other_async_op().await;

    val1 + val2
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}

Допълнителни материали


Въпроси