Многонишково програмиране

29 ноември 2022

Fearless concurrency

Must be this tall

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    thread::spawn(|| {
        // това най-вероятно няма да се принтира, защото програмата
        // ще завърши преди втората нишка да е почнала
        println!("hi from spawned thread");
    });

    println!("hi from main thread");
}
hi from main thread
use std::thread;

fn main() {
    thread::spawn(|| {
        // това най-вероятно няма да се принтира, защото програмата
        // ще завърши преди втората нишка да е почнала
        println!("hi from spawned thread");
    });

    println!("hi from main thread");
}

Нишки

Сигнатурата на std::thread::spawn

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото изчакваме
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото изчакваме
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // very hard computation ...
        42
    });

    let answ = handle.join();
    println!("{:?}", answ);
}
Ok(42)
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // very hard computation ...
        42
    });

    let answ = handle.join();
    println!("{:?}", answ);
}

Panic в нишка

Panic в нишка

Panic в нишка

Panic в нишка

Panic в нишка

Join в деструктор

1 2 3 4 5 6 7 8 9 10 11 12
struct Wrapper { handle: JoinHandle<()> }

impl Drop for Wrapper {
    fn drop(&mut self) {
        self.handle.join().unwrap();
    }
}

fn main() {
    let handle = thread::spawn(|| println!("Изчакай ме"));
    let wrapper = Wrapper { handle };
}
error[E0507]: cannot move out of `self.handle` which is behind a mutable reference --> src/bin/main_465780dc841f100a7232a8fca5a43bc856234c4c.rs:7:9 | 7 | self.handle.join().unwrap(); | ^^^^^^^^^^^ ------ `self.handle` moved due to this method call | | | move occurs because `self.handle` has type `JoinHandle<()>`, which does not implement the `Copy` trait | note: this function takes ownership of the receiver `self`, which moves `self.handle` --> /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1522:17 | 1522 | pub fn join(self) -> Result<T> { | ^^^^ For more information about this error, try `rustc --explain E0507`. error: could not compile `rust` due to previous error
use std::thread::{self, JoinHandle};

struct Wrapper { handle: JoinHandle<()> }

impl Drop for Wrapper {
    fn drop(&mut self) {
        self.handle.join().unwrap();
    }
}

fn main() {
    let handle = thread::spawn(|| println!("Изчакай ме"));
    let wrapper = Wrapper { handle };
}

Join в деструктор

1 2 3 4 5 6 7 8 9 10 11 12 13 14
struct Wrapper { handle: Option<JoinHandle<()>> }

impl Drop for Wrapper {
    fn drop(&mut self) {
        if let Some(handle) = self.handle.take() {
            handle.join().unwrap();
        }
    }
}

fn main() {
    let handle = thread::spawn(|| println!("Изчакай ме"));
    let wrapper = Wrapper { handle: Some(handle) };
}
Изчакай ме
use std::thread::{self, JoinHandle};

struct Wrapper { handle: Option> }

impl Drop for Wrapper {
    fn drop(&mut self) {
        if let Some(handle) = self.handle.take() {
            handle.join().unwrap();
        }
    }
}

fn main() {
    let handle = thread::spawn(|| println!("Изчакай ме"));
    let wrapper = Wrapper { handle: Some(handle) };
}

Споделяне на стойности

Споделяне на стойности

Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход …

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход - води до компилационна грешка.

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` due to previous error
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Новосъздадената нишка може да надживее функцията в която е извикана, затова Rust не позволява да подадем референции към локални променливи.

Това се налага от ограничението на spawn, която приема F: 'static

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Споделяне на стойности

Ако използваме стойността само от новата нишка, можем да я преместим с move closure

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
number 0 number 1 number 2 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Но това не би работило ако имаме повече от една нишка

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(move || {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0382]: use of moved value: `nums` --> src/bin/main_fb665c10ea221cdd41e075bfd1468b11553fe02f.rs:8:36 | 4 | let nums = vec![0, 1, 2, 3]; | ---- move occurs because `nums` has type `Vec<i32>`, which does not implement the `Copy` trait ... 8 | handles.push(thread::spawn(move || { | ^^^^^^^ value moved into closure here, in previous iteration of loop 9 | for i in &nums { | ---- use occurs due to use in closure For more information about this error, try `rustc --explain E0382`. error: could not compile `rust` due to previous error
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(move || {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Scoped threads

Единия вариант е да използваме scoped threads API-то

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    thread::scope(|s| {
        for _ in 0..2 {
            s.spawn(|| {
                for i in &nums {
                    println!("number {}", i);
                }
            });
        }
    });
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    thread::scope(|s| {
        for _ in 0..2 {
            s.spawn(|| {
                for i in &nums {
                    println!("number {}", i);
                }
            });
        }
    });
}

Scoped threads

1 2 3 4 5 6 7 8 9 10 11 12 13 14
thread::scope(|s /*: thread::Scope<'_, '_> */| {
    // тази функция се изпълнява в същата нишка

    // Scope::spawn създава нова нишка
    // Новата нишка може да държи референции към локални променливи
    s.spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    // на края на функцията всички нишки създадени чрез Scope::spawn
    // се join-ват.
});
number 1 number 2 number 3
use std::thread;
fn main() {
let nums = vec![1, 2, 3];
thread::scope(|s /*: thread::Scope<'_, '_> */| {
    // тази функция се изпълнява в същата нишка

    // Scope::spawn създава нова нишка
    // Новата нишка може да държи референции към локални променливи
    s.spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    // на края на функцията всички нишки създадени чрез Scope::spawn
    // се join-ват.
});
}

Scoped threads

В сигнатурата на Scope::spawn ограничението е F: 'scope, а не F: 'static

1 2 3 4 5 6 7
impl<'scope, 'env> Scope<'scope, 'env> {
    pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
    where
        F: FnOnce() -> T + Send + 'scope,
        T: Send + 'scope,
    { /* ... */ }
}

Споделяне на стойности

Да се върнем на thread::spawn.
Друг вариант е да използваме нещо, което:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
fn main() {
    // TODO: какво да добавим тука?
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_dbbaffd31a06265d4bc9d454b57919c8eceb4e71.rs:9:36 | 9 | handles.push(thread::spawn(|| { | ^^ may outlive borrowed value `nums` 10 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_dbbaffd31a06265d4bc9d454b57919c8eceb4e71.rs:9:22 | 9 | handles.push(thread::spawn(|| { | ______________________^ 10 | | for i in &nums { 11 | | println!("number {}", i); 12 | | } 13 | | })); | |__________^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 9 | handles.push(thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` due to previous error
use std::thread;

fn main() {
    // TODO: какво да добавим тука?
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Rc

Такава структура, която позволява "споделена собственост" (shared ownership) e Rc.
Това дали ще проработи?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Rc

Такава структура, която позволява "споделена собственост" (shared ownership) e Rc.
Това дали ще проработи? - не

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ------------- ^------ | | | | ______________________|_____________within this `[closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43]` | | | | | required by a bound introduced by this call 12 | | for i in &*nums_rc { 13 | | println!("number {}", i); 14 | | } 15 | | })); | |_________^ `Rc<Vec<i32>>` cannot be sent between threads safely | = help: within `[closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` note: required because it's used within this closure --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ^^^^^^^ note: required by a bound in `spawn` --> /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:662:8 | 662 | F: Send + 'static, | ^^^^ required by this bound in `spawn` For more information about this error, try `rustc --explain E0277`. error: could not compile `rust` due to previous error
use std::thread;
use std::rc::Rc;

fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Rc и Arc

Rc не може да се използва от няколко нишки едновременно

Rc и Arc

Rc не може да се използва от няколко нишки едновременно

Rc и Arc

Rc не може да се използва от няколко нишки едновременно

Rc и Arc

Rc не може да се използва от няколко нишки едновременно

Rc и Arc

Rc не може да се използва от няколко нишки едновременно

Rc и Arc

Rc не може да се използва от няколко нишки едновременно

Rc и Arc

Решението е да използваме std::sync::Arc

Rc и Arc

Решението е да използваме std::sync::Arc

Rc и Arc

Решението е да използваме std::sync::Arc

Rc и Arc

Решението е да използваме std::sync::Arc

Rc и Arc

Решението е да използваме std::sync::Arc

Споделяне на стойности - Arc

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_arc = Arc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_arc = Arc::clone(&nums_arc);
        handles.push(thread::spawn(move || {
            for i in &*nums_arc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;
use std::sync::Arc;

fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_arc = Arc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_arc = Arc::clone(&nums_arc);
        handles.push(thread::spawn(move || {
            for i in &*nums_arc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Send и Sync

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Send и Sync

Трейтовете std::marker::Send и std::marker::Sync показват дали един тип е thread safe.
Т.е. дали обекти от този тип могат да се използват безопасно в многонишков контекст.

1 2
pub unsafe auto trait Send { }
pub unsafe auto trait Sync { }

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Отговор

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Отговор

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Отговор

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Аuto traits

1
pub struct Token(u32);
pub struct Token(u32);
fn main() {}

Auto trait docs

Send и Sync

Unsafe traits

1 2 3 4
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {}
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

Хак за stable

1 2 3
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {}
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Примитиви за синхронизация

Примитиви за синхронизация

Стандартния пример за грешен многонишков алгоритъм не се компилира

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;

let t1 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[0..50] { *sum += i; })
};

let t2 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[51..100] { *sum += i; })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Защо не се компилира? Какъв може да е типа на sum?

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Модула std::sync

Mutex

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock()` връща "умен указател" с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock()` връща "умен указател" с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Мutex

Обикновенно мутекса се възприема като примитива която определя критична секция

1 2 3 4 5 6 7
lock(my_mutex);
// начало на критичната секция

do_stuff(shared_data);

// край на критичната секция
unlock(my_mutex);

В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex е generic и опакова данните.

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

RwLock

RwLock

RwLock

RwLock

Mutex или RwLock

Mutex или RwLock

Mutex или RwLock

Mutex или RwLock

Arc + Mutex

Подобно на Rc<RefCell<T>>, може често да виждате Arc<Mutex<T>> или Arc<RwLock<T>>

Arc + Mutex

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
let v = Arc::new((0..100).collect::<Vec<_>>());
let total_sum = Arc::new(Mutex::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", *total_sum.lock().unwrap());
sum: 4950
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let total_sum = Arc::new(Mutex::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", *total_sum.lock().unwrap());
}

Arc + Mutex

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let v = (0..100).collect::<Vec<_>>();
let mut total_sum = Mutex::new(0);

thread::scope(|s| {
    s.spawn(|| {
        let local_sum = v[..50].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    });

    s.spawn(|| {
        let local_sum = v[50..].iter().sum::<i32>();
        *total_sum.lock().unwrap() += local_sum;
    });
});

println!("sum: {}", *total_sum.get_mut().unwrap());
sum: 4950
use std::thread;
use std::sync::Mutex;
fn main() {
let v = (0..100).collect::>();
let mut total_sum = Mutex::new(0);

thread::scope(|s| {
    s.spawn(|| {
        let local_sum = v[..50].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    });

    s.spawn(|| {
        let local_sum = v[50..].iter().sum::();
        *total_sum.lock().unwrap() += local_sum;
    });
});

println!("sum: {}", *total_sum.get_mut().unwrap());
}

Други примитиви

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Пример

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
use std::sync::atomic::{AtomicI32, Ordering};

let v = Arc::new((0..100).collect::<Vec<_>>());
let total_sum = Arc::new(AtomicI32::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::<i32>();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::<i32>();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", total_sum.load(Ordering::SeqCst));
sum: 4950
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
let v = Arc::new((0..100).collect::>());
let total_sum = Arc::new(AtomicI32::new(0));

let t1 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[..50].iter().sum::();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let t2 = {
    let v = Arc::clone(&v);
    let total_sum = Arc::clone(&total_sum);
    thread::spawn(move || {
        let local_sum = v[50..].iter().sum::();
        total_sum.fetch_add(local_sum, Ordering::SeqCst);
    })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", total_sum.load(Ordering::SeqCst));
}

Канали

MPSC

Канали

Don't communicate by sharing memory,
share memory by communicating

Канали в стандартната библиотека

1 2 3 4 5 6 7 8 9 10 11 12
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
}

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Ограничен канал

1 2 3 4 5 6 7 8 9 10 11 12
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}

Множество изпращачи

Изпращащата част може да се клонира

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
1 2 3 4
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
}

Sender

Методи

1 2 3 4
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>
struct SendError<T>(pub T);

Sender

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem;
use std::sync::mpsc::{self, SendError};
fn main() {
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
}

SyncSender

Методи

1 2 3 4 5 6 7 8 9
// блокира ако буфера е пълен
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>
struct SendError<T>(pub T);

// никога не блокира
// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>
enum TrySendError<T> { Full(T), Disconnected(T) }

SyncSender

Методи

1 2 3 4 5 6 7 8
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem;
use std::sync::mpsc::{self, TrySendError};
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
}

Множество получатели

Множество получатели

Множество получатели

Множество получатели

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11 12 13 14
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>
struct RecvError;

// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>
pub enum TryRecvError { Empty, Disconnected }

// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
enum RecvTimeoutError { Timeout, Disconnected }

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}
}

Receiver

Итератори

1 2 3 4 5 6 7 8 9 10 11 12 13 14
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
    println!("received {}", msg);
}
}

Receiver

Итератори

1 2 3 4 5 6 7 8 9 10 11 12 13
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
    println!("received {}", msg);
}
}

Crossbeam channel

Често използвана алтернативна имплементация на канали

Препоръчвам използването на crossbeam-channel пред std::sync::mpsc

Имплементацията на каналите в std скоро ще бъде сменена да използва crossbeam-channel
https://github.com/rust-lang/rust/pull/93563

Crossbeam channel

Разлики в API-то

std::sync::mpsc crossbeam_channel
вид MPSC MPMC
неограничен channel() unbounded()
неограничен - типове (Sender, Receiver) (Sender, Receiver)
ограничен sync_channel(k) bounded(k)
ограничен - типове (SyncSender, Receiver) (Sender, Receiver)

Crossbeam channel

Разлики в API-то

std::sync::mpsc::Sender std::sync::mpsc::SyncSender crossbeam_channel::Sender
трейтове Send + Clone Send + Clone Send + Sync + Clone
try_send n/a
send
send_timeout - -
std::sync::mpsc::Receiver crossbeam_channel::Receiver
трейтове Send Send + Sync + Clone
try_recv
recv
recv_timeout

Външни библиотеки

Crossbeam

Външни библиотеки

Parking lot

Въпроси