Многонишково програмиране 2

задачи, работници, канали

01 декември 2022

Организация на многонишкови програми

Организация на многонишкови програми

Организация на многонишкови програми

Организация на многонишкови програми

Асинхронно програмиране

Threadpool

Threadpool

Threadpool

Threadpool

Threadpool

Threadpool

Пример: https://docs.rs/threadpool

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use threadpool::ThreadPool;
use std::sync::mpsc::channel;

let n_workers = 4;
let n_jobs = 8;
let pool = ThreadPool::new(n_workers);

let (sender, receiver) = channel();
for _ in 0..n_jobs {
    let sender = sender.clone();
    pool.execute(move|| {
        let res = some_work();
        sender.send(res).unwrap();
    });
}

drop(sender);
println!("results: {:?}", receiver.iter().collect::<Vec<_>>());
results: [1, 1, 1, 1, 1, 1, 1, 1]
fn some_work() -> i32 { 1 }
fn main() {
use threadpool::ThreadPool;
use std::sync::mpsc::channel;

let n_workers = 4;
let n_jobs = 8;
let pool = ThreadPool::new(n_workers);

let (sender, receiver) = channel();
for _ in 0..n_jobs {
    let sender = sender.clone();
    pool.execute(move|| {
        let res = some_work();
        sender.send(res).unwrap();
    });
}

drop(sender);
println!("results: {:?}", receiver.iter().collect::>());
}

"Worker threads"

Забележка: не знам официално название за тази парадигма

"Worker threads"

Забележка: не знам официално название за тази парадигма

"Worker threads"

Забележка: не знам официално название за тази парадигма

"Worker threads"

Забележка: не знам официално название за тази парадигма

"Worker threads"

Забележка: не знам официално название за тази парадигма

"Worker threads"

Можем да пуснем нишка във фонов режим, но не можем да си комуникираме с нея.
Дори не можем да я спрем.

1 2 3 4 5 6 7 8
let worker_thread = thread::spawn(run);

fn run() {
    loop {
        do_stuff();
        std::thread::sleep(Duration::from_secs(1));
    }
}
use std::thread;
use std::time::Duration;
fn main() {
let worker_thread = thread::spawn(run);

fn run() {
    loop {
        do_stuff();
        std::thread::sleep(Duration::from_secs(1));
    }
}
fn do_stuff() {}
}

"Worker threads"

Можем да си направим канал между главната нишка и работническата (worker) нишка.
В случая ще спрем работническата нишка когато деструктираме изпращащата част на канала.
Алтернатино бихме могли да използваме съобщение Msg::Close

1 2 3 4 5 6 7 8 9 10 11 12
let (sender, receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || run(receiver));

fn run(receiver: Receiver<Msg>) {
    for msg in receiver.iter() {
        do_stuff(msg);
    }
}

enum Msg {
    /* ... */
}
use std::thread;
use std::sync::mpsc::{self, Receiver};
fn main() {
let (sender, receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || run(receiver));

fn run(receiver: Receiver) {
    for msg in receiver.iter() {
        do_stuff(msg);
    }
}

enum Msg {
    /* ... */
}
fn do_stuff(_msg: Msg) {}
}

"Worker threads"

Опаковаме в няколко структури, за по-лесна организация.
Добавяме и няколко съобщения.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Stuff1,
    Stuff2 { num: u32 },
}

pub struct WorkerHandle {
    pub sender: Sender<Msg>,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let worker_thread = thread::spawn(move || {
            let worker = Worker {};
            run(worker, receiver);
        });
        WorkerHandle { sender, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver<Msg>) {
    for msg in receiver.iter() {
        match msg {
            Msg::Stuff1 => worker.do_stuff_1(),
            Msg::Stuff2 { num } => worker.do_stuff_2(num),
        }
    }
}

struct Worker {}

impl Worker {
    fn do_stuff_1(&mut self) {
        println!("stuff1");
    }

    fn do_stuff_2(&mut self, num: u32) {
        println!("stuff2 {}", num);
    }
}
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Stuff1,
    Stuff2 { num: u32 },
}

pub struct WorkerHandle {
    pub sender: Sender,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let worker_thread = thread::spawn(move || {
            let worker = Worker {};
            run(worker, receiver);
        });
        WorkerHandle { sender, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver) {
    for msg in receiver.iter() {
        match msg {
            Msg::Stuff1 => worker.do_stuff_1(),
            Msg::Stuff2 { num } => worker.do_stuff_2(num),
        }
    }
}

struct Worker {}

impl Worker {
    fn do_stuff_1(&mut self) {
        println!("stuff1");
    }

    fn do_stuff_2(&mut self, num: u32) {
        println!("stuff2 {}", num);
    }
}

fn do_stuff(_msg: Msg) {}
fn main() {}

"Worker threads"

Данни

При многонишкови програми е стандартно споделени данни да се държат в Arc<Mutex<_>> или подобно.
Worker нишките ни позволяват и алтернативен подход - ако данните се достъпват само от worker-а те няма нужда да се споделят с други нишки. Това ни позволява да пишем код все едно е еднонишков и да използваме mutable променливи без синхронизация.
Разбира се двата подхода могат да се комбинират - локални mutable данни и споделени данни зад Arc<Mutex<_>>

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,
}

pub struct WorkerHandle {
    pub sender: Sender<Msg>,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver);
        });
        WorkerHandle { sender, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver<Msg>) {
    // Това е единствената нишка, която има достъп до `worker.data_vec`,
    // затова можем да използваме стандартен код който работи с mutable референции

    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
        }
    }
}

struct Worker {
    data_vec: Vec<u32>,
}
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,
}

pub struct WorkerHandle {
    pub sender: Sender,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver);
        });
        WorkerHandle { sender, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver) {
    // Това е единствената нишка, която има достъп до `worker.data_vec`,
    // затова можем да използваме стандартен код който работи с mutable референции

    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
        }
    }
}

struct Worker {
    data_vec: Vec,
}

fn main() {}

"Worker threads"

Двупосочна комуникация

Нека добавим съобщение Msg::Get(i), което връща елемент от вектора на изпращача.

Въпросът е как да върнем данни от worker нишката?
Трябва да си създадем канал в обратната посока.
Единия вариант е да създаваме отделен канал за всяко съобщение.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, SyncSender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,

    // в съобщението указваме адреса за връщане
    Get(usize, SyncSender<GetResult>),
}

pub struct GetResult(Option<u32>);

pub struct WorkerHandle {
    pub sender: Sender<Msg>,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver);
        });
        WorkerHandle { sender, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver<Msg>) {
    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
            Msg::Get(i, ret_sender) => {
                // `Option::cloned` - от `Option<&T>` към `Option<T>`
                // като клонира вътрешността
                let elem = worker.data_vec.get(i).cloned();

                // Не можем да изпратим - явно получателя се е отказал.
                // Игнорираме грешката
                let _ = ret_sender.send(GetResult(elem));
            }
        }
    }
}

struct Worker {
    data_vec: Vec<u32>,
}

fn main() {
    let handle = WorkerHandle::spawn();

    handle.sender.send(Msg::Push { data: 123 }).expect("worker died");

    // Забележка - това би могло да се скрие зад метод на `WorkerHandle`
    let elem = {
        let (ret_sender, ret_receiver) = mpsc::sync_channel(1);
        handle.sender.send(Msg::Get(0, ret_sender)).expect("workder died");

        // недостатък - блокираме докато чакаме отговора
        let GetResult(elem) = ret_receiver.recv().expect("worker died");
        elem
    };
    println!("{:?}", elem);
}
Some(123)
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, SyncSender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,

    // в съобщението указваме адреса за връщане
    Get(usize, SyncSender),
}

pub struct GetResult(Option);

pub struct WorkerHandle {
    pub sender: Sender,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver);
        });
        WorkerHandle { sender, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver) {
    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
            Msg::Get(i, ret_sender) => {
                // `Option::cloned` - от `Option<&T>` към `Option`
                // като клонира вътрешността
                let elem = worker.data_vec.get(i).cloned();

                // Не можем да изпратим - явно получателя се е отказал.
                // Игнорираме грешката
                let _ = ret_sender.send(GetResult(elem));
            }
        }
    }
}

struct Worker {
    data_vec: Vec,
}

fn main() {
    let handle = WorkerHandle::spawn();

    handle.sender.send(Msg::Push { data: 123 }).expect("worker died");

    // Забележка - това би могло да се скрие зад метод на `WorkerHandle`
    let elem = {
        let (ret_sender, ret_receiver) = mpsc::sync_channel(1);
        handle.sender.send(Msg::Get(0, ret_sender)).expect("workder died");

        // недостатък - блокираме докато чакаме отговора
        let GetResult(elem) = ret_receiver.recv().expect("worker died");
        elem
    };
    println!("{:?}", elem);
}

"Worker threads"

Двупосочна комуникация

Втори вариант е да държим канал в handle-a

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,
    Get(usize),
}

pub enum RetMsg {
    GetResult(Option<u32>),
}

pub struct WorkerHandle {
    pub sender: Sender<Msg>,
    pub ret_receiver: Receiver<RetMsg>,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let (ret_sender, ret_receiver) = mpsc::channel();

        let worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver, ret_sender);
        });

        WorkerHandle { sender, ret_receiver, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver<Msg>, ret_sender: Sender<RetMsg>) {
    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
            Msg::Get(i) => {
                // `Option::cloned` - от `Option<&T>` към `Option<T>`
                // като клонира вътрешността
                let elem = worker.data_vec.get(i).cloned();

                // Не можем да изпратим - явно получателя се е отказал.
                // Игнорираме грешката
                let _ = ret_sender.send(RetMsg::GetResult(elem));
            }
        }
    }
}

struct Worker {
    data_vec: Vec<u32>,
}

fn main() {
    let handle = WorkerHandle::spawn();

    handle.sender.send(Msg::Push { data: 123 }).expect("worker died");
    handle.sender.send(Msg::Get(0)).expect("workder died");

    // По-късно
    let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died");
    println!("{:?}", elem);
}
Some(123)
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,
    Get(usize),
}

pub enum RetMsg {
    GetResult(Option),
}

pub struct WorkerHandle {
    pub sender: Sender,
    pub ret_receiver: Receiver,
    pub thread: JoinHandle<()>,
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();
        let (ret_sender, ret_receiver) = mpsc::channel();

        let worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver, ret_sender);
        });

        WorkerHandle { sender, ret_receiver, thread: worker_thread }
    }
}

fn run(mut worker: Worker, receiver: Receiver, ret_sender: Sender) {
    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
            Msg::Get(i) => {
                // `Option::cloned` - от `Option<&T>` към `Option`
                // като клонира вътрешността
                let elem = worker.data_vec.get(i).cloned();

                // Не можем да изпратим - явно получателя се е отказал.
                // Игнорираме грешката
                let _ = ret_sender.send(RetMsg::GetResult(elem));
            }
        }
    }
}

struct Worker {
    data_vec: Vec,
}

fn main() {
    let handle = WorkerHandle::spawn();

    handle.sender.send(Msg::Push { data: 123 }).expect("worker died");
    handle.sender.send(Msg::Get(0)).expect("workder died");

    // По-късно
    let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died");
    println!("{:?}", elem);
}

"Worker threads"

Двупосочна комуникация

Недостатъка на втория подход е, имаме само един получател, защото Receiver<RetMsg> не може да се клонира.
Ако искаме да изпращаме съобщения от няколко нишки и всяка нишка да има отделен канал за отговори, можем да го направим като запазваме тези канали в WorkerHandle-а.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
use std::thread;
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,
    Get(usize, Sender<RetMsg>),
}

pub enum RetMsg {
    GetResult(Option<u32>),
}

pub struct WorkerHandle {
    pub sender: Sender<Msg>,
    pub ret_sender: Sender<RetMsg>,
    pub ret_receiver: Receiver<RetMsg>,

    // JoinHandle не може да се клонира.
    // Бихме могли да си поиграем с API-то, но за сега
    // ще го игнорираме и ще се фокусираме върху каналите
    // pub thread: JoinHandle<()>,
}

impl Clone for WorkerHandle {
    fn clone(&self) -> Self {
        let (ret_sender, ret_receiver) = mpsc::channel();
        WorkerHandle { sender: self.sender.clone(), ret_sender, ret_receiver }
    }
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();

        let _worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver);
        });

        let (ret_sender, ret_receiver) = mpsc::channel();
        WorkerHandle { sender, ret_sender, ret_receiver }
    }
}

fn run(mut worker: Worker, receiver: Receiver<Msg>) {
    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
            Msg::Get(i, ret_sender) => {
                // `Option::cloned` - от `Option<&T>` към `Option<T>`
                // като клонира вътрешността
                let elem = worker.data_vec.get(i).cloned();

                // Не можем да изпратим - явно получателя се е отказал.
                // Игнорираме грешката
                let _ = ret_sender.send(RetMsg::GetResult(elem));
            }
        }
    }
}

struct Worker {
    data_vec: Vec<u32>,
}

fn main() {
    let handle = WorkerHandle::spawn();

    handle.sender.send(Msg::Push { data: 123 }).expect("worker died");

    handle.sender.send(Msg::Get(0, handle.ret_sender.clone())).expect("workder died");

    // По-късно
    let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died");
    println!("{:?}", elem);
}
Some(123)
use std::thread;
use std::sync::mpsc::{self, Sender, Receiver};

pub enum Msg {
    Push { data: u32 },
    Pop,
    Get(usize, Sender),
}

pub enum RetMsg {
    GetResult(Option),
}

pub struct WorkerHandle {
    pub sender: Sender,
    pub ret_sender: Sender,
    pub ret_receiver: Receiver,

    // JoinHandle не може да се клонира.
    // Бихме могли да си поиграем с API-то, но за сега
    // ще го игнорираме и ще се фокусираме върху каналите
    // pub thread: JoinHandle<()>,
}

impl Clone for WorkerHandle {
    fn clone(&self) -> Self {
        let (ret_sender, ret_receiver) = mpsc::channel();
        WorkerHandle { sender: self.sender.clone(), ret_sender, ret_receiver }
    }
}

impl WorkerHandle {
    pub fn spawn() -> WorkerHandle {
        let (sender, receiver) = mpsc::channel();

        let _worker_thread = thread::spawn(move || {
            let worker = Worker { data_vec: vec![] };
            run(worker, receiver);
        });

        let (ret_sender, ret_receiver) = mpsc::channel();
        WorkerHandle { sender, ret_sender, ret_receiver }
    }
}

fn run(mut worker: Worker, receiver: Receiver) {
    for msg in receiver.iter() {
        match msg {
            Msg::Push { data }  => {
                worker.data_vec.push(data);
            }
            Msg::Pop => {
                worker.data_vec.pop();
            }
            Msg::Get(i, ret_sender) => {
                // `Option::cloned` - от `Option<&T>` към `Option`
                // като клонира вътрешността
                let elem = worker.data_vec.get(i).cloned();

                // Не можем да изпратим - явно получателя се е отказал.
                // Игнорираме грешката
                let _ = ret_sender.send(RetMsg::GetResult(elem));
            }
        }
    }
}

struct Worker {
    data_vec: Vec,
}

fn main() {
    let handle = WorkerHandle::spawn();

    handle.sender.send(Msg::Push { data: 123 }).expect("worker died");

    handle.sender.send(Msg::Get(0, handle.ret_sender.clone())).expect("workder died");

    // По-късно
    let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died");
    println!("{:?}", elem);
}

"Worker threads"

Обобщение


Въпроси