Многонишково програмиране 2
задачи, работници, канали
01 декември 2022
Организация на многонишкови програми
Организация на многонишкови програми
- има много варианти, ще покажем само някои как се реализират в Ръст
Организация на многонишкови програми
- има много варианти, ще покажем само някои как се реализират в Ръст
- кой е най-удачен зависи много от ситуацията
Организация на многонишкови програми
- има много варианти, ще покажем само някои как се реализират в Ръст
- кой е най-удачен зависи много от ситуацията
- идеята е по-скоро да покажем Ръст код, а не да разглеждаме различни архитектури
Асинхронно програмиране
- ще говорим в отделна лекция
Threadpool
Threadpool
- или директно или чрез библиотека като
rayon
Threadpool
- или директно или чрез библиотека като
rayon
- глобална опашка от задачи, които се разпределят автоматично върху нишки
Threadpool
- или директно или чрез библиотека като
rayon
- глобална опашка от задачи, които се разпределят автоматично върху нишки
- третираме задачите като равностойни
Threadpool
- или директно или чрез библиотека като
rayon
- глобална опашка от задачи, които се разпределят автоматично върху нишки
- третираме задачите като равностойни
- използва се когато ни трябва паралелизация
Threadpool
Пример: https://docs.rs/threadpool
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
let n_workers = 4;
let n_jobs = 8;
let pool = ThreadPool::new(n_workers);
let (sender, receiver) = channel();
for _ in 0..n_jobs {
let sender = sender.clone();
pool.execute(move|| {
let res = some_work();
sender.send(res).unwrap();
});
}
drop(sender);
println!("results: {:?}", receiver.iter().collect::<Vec<_>>());
results: [1, 1, 1, 1, 1, 1, 1, 1]
fn some_work() -> i32 { 1 } fn main() { use threadpool::ThreadPool; use std::sync::mpsc::channel; let n_workers = 4; let n_jobs = 8; let pool = ThreadPool::new(n_workers); let (sender, receiver) = channel(); for _ in 0..n_jobs { let sender = sender.clone(); pool.execute(move|| { let res = some_work(); sender.send(res).unwrap(); }); } drop(sender); println!("results: {:?}", receiver.iter().collect::>()); }
"Worker threads"
Забележка: не знам официално название за тази парадигма
"Worker threads"
Забележка: не знам официално название за тази парадигма
- пускаме някаква нишка във фонов режим
"Worker threads"
Забележка: не знам официално название за тази парадигма
- пускаме някаква нишка във фонов режим
- изпълнява работа периодично или като и се изпрати съобщение
"Worker threads"
Забележка: не знам официално название за тази парадигма
- пускаме някаква нишка във фонов режим
- изпълнява работа периодично или като и се изпрати съобщение
- не е най-мощната или ефикасната парадигма
"Worker threads"
Забележка: не знам официално название за тази парадигма
- пускаме някаква нишка във фонов режим
- изпълнява работа периодично или като и се изпрати съобщение
- не е най-мощната или ефикасната парадигма
- но е лесна за имплементиране и помага с организация на кода
"Worker threads"
Можем да пуснем нишка във фонов режим, но не можем да си комуникираме с нея.
Дори не можем да я спрем.
let worker_thread = thread::spawn(run);
fn run() {
loop {
do_stuff();
std::thread::sleep(Duration::from_secs(1));
}
}
use std::thread; use std::time::Duration; fn main() { let worker_thread = thread::spawn(run); fn run() { loop { do_stuff(); std::thread::sleep(Duration::from_secs(1)); } } fn do_stuff() {} }
"Worker threads"
Можем да си направим канал между главната нишка и работническата (worker) нишка.
В случая ще спрем работническата нишка когато деструктираме изпращащата част на канала.
Алтернатино бихме могли да използваме съобщение Msg::Close
let (sender, receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || run(receiver));
fn run(receiver: Receiver<Msg>) {
for msg in receiver.iter() {
do_stuff(msg);
}
}
enum Msg {
/* ... */
}
use std::thread; use std::sync::mpsc::{self, Receiver}; fn main() { let (sender, receiver) = mpsc::channel(); let worker_thread = thread::spawn(move || run(receiver)); fn run(receiver: Receiver) { for msg in receiver.iter() { do_stuff(msg); } } enum Msg { /* ... */ } fn do_stuff(_msg: Msg) {} }
"Worker threads"
Опаковаме в няколко структури, за по-лесна организация.
Добавяме и няколко съобщения.
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};
pub enum Msg {
Stuff1,
Stuff2 { num: u32 },
}
pub struct WorkerHandle {
pub sender: Sender<Msg>,
pub thread: JoinHandle<()>,
}
impl WorkerHandle {
pub fn spawn() -> WorkerHandle {
let (sender, receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || {
let worker = Worker {};
run(worker, receiver);
});
WorkerHandle { sender, thread: worker_thread }
}
}
fn run(mut worker: Worker, receiver: Receiver<Msg>) {
for msg in receiver.iter() {
match msg {
Msg::Stuff1 => worker.do_stuff_1(),
Msg::Stuff2 { num } => worker.do_stuff_2(num),
}
}
}
struct Worker {}
impl Worker {
fn do_stuff_1(&mut self) {
println!("stuff1");
}
fn do_stuff_2(&mut self, num: u32) {
println!("stuff2 {}", num);
}
}
use std::thread::{self, JoinHandle}; use std::sync::mpsc::{self, Sender, Receiver}; pub enum Msg { Stuff1, Stuff2 { num: u32 }, } pub struct WorkerHandle { pub sender: Sender, pub thread: JoinHandle<()>, } impl WorkerHandle { pub fn spawn() -> WorkerHandle { let (sender, receiver) = mpsc::channel(); let worker_thread = thread::spawn(move || { let worker = Worker {}; run(worker, receiver); }); WorkerHandle { sender, thread: worker_thread } } } fn run(mut worker: Worker, receiver: Receiver ) { for msg in receiver.iter() { match msg { Msg::Stuff1 => worker.do_stuff_1(), Msg::Stuff2 { num } => worker.do_stuff_2(num), } } } struct Worker {} impl Worker { fn do_stuff_1(&mut self) { println!("stuff1"); } fn do_stuff_2(&mut self, num: u32) { println!("stuff2 {}", num); } } fn do_stuff(_msg: Msg) {} fn main() {}
"Worker threads"
Данни
При многонишкови програми е стандартно споделени данни да се държат в Arc<Mutex<_>>
или подобно.
Worker нишките ни позволяват и алтернативен подход - ако данните се достъпват само от worker-а те няма нужда да се споделят с други нишки. Това ни позволява да пишем код все едно е еднонишков и да използваме mutable променливи без синхронизация.
Разбира се двата подхода могат да се комбинират - локални mutable данни и споделени данни зад Arc<Mutex<_>>
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};
pub enum Msg {
Push { data: u32 },
Pop,
}
pub struct WorkerHandle {
pub sender: Sender<Msg>,
pub thread: JoinHandle<()>,
}
impl WorkerHandle {
pub fn spawn() -> WorkerHandle {
let (sender, receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || {
let worker = Worker { data_vec: vec![] };
run(worker, receiver);
});
WorkerHandle { sender, thread: worker_thread }
}
}
fn run(mut worker: Worker, receiver: Receiver<Msg>) {
// Това е единствената нишка, която има достъп до `worker.data_vec`,
// затова можем да използваме стандартен код който работи с mutable референции
for msg in receiver.iter() {
match msg {
Msg::Push { data } => {
worker.data_vec.push(data);
}
Msg::Pop => {
worker.data_vec.pop();
}
}
}
}
struct Worker {
data_vec: Vec<u32>,
}
use std::thread::{self, JoinHandle}; use std::sync::mpsc::{self, Sender, Receiver}; pub enum Msg { Push { data: u32 }, Pop, } pub struct WorkerHandle { pub sender: Sender, pub thread: JoinHandle<()>, } impl WorkerHandle { pub fn spawn() -> WorkerHandle { let (sender, receiver) = mpsc::channel(); let worker_thread = thread::spawn(move || { let worker = Worker { data_vec: vec![] }; run(worker, receiver); }); WorkerHandle { sender, thread: worker_thread } } } fn run(mut worker: Worker, receiver: Receiver ) { // Това е единствената нишка, която има достъп до `worker.data_vec`, // затова можем да използваме стандартен код който работи с mutable референции for msg in receiver.iter() { match msg { Msg::Push { data } => { worker.data_vec.push(data); } Msg::Pop => { worker.data_vec.pop(); } } } } struct Worker { data_vec: Vec , } fn main() {}
"Worker threads"
Двупосочна комуникация
Нека добавим съобщение Msg::Get(i)
, което връща елемент от вектора на изпращача.
Въпросът е как да върнем данни от worker нишката?
Трябва да си създадем канал в обратната посока.
Единия вариант е да създаваме отделен канал за всяко съобщение.
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, SyncSender, Receiver};
pub enum Msg {
Push { data: u32 },
Pop,
// в съобщението указваме адреса за връщане
Get(usize, SyncSender<GetResult>),
}
pub struct GetResult(Option<u32>);
pub struct WorkerHandle {
pub sender: Sender<Msg>,
pub thread: JoinHandle<()>,
}
impl WorkerHandle {
pub fn spawn() -> WorkerHandle {
let (sender, receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || {
let worker = Worker { data_vec: vec![] };
run(worker, receiver);
});
WorkerHandle { sender, thread: worker_thread }
}
}
fn run(mut worker: Worker, receiver: Receiver<Msg>) {
for msg in receiver.iter() {
match msg {
Msg::Push { data } => {
worker.data_vec.push(data);
}
Msg::Pop => {
worker.data_vec.pop();
}
Msg::Get(i, ret_sender) => {
// `Option::cloned` - от `Option<&T>` към `Option<T>`
// като клонира вътрешността
let elem = worker.data_vec.get(i).cloned();
// Не можем да изпратим - явно получателя се е отказал.
// Игнорираме грешката
let _ = ret_sender.send(GetResult(elem));
}
}
}
}
struct Worker {
data_vec: Vec<u32>,
}
fn main() {
let handle = WorkerHandle::spawn();
handle.sender.send(Msg::Push { data: 123 }).expect("worker died");
// Забележка - това би могло да се скрие зад метод на `WorkerHandle`
let elem = {
let (ret_sender, ret_receiver) = mpsc::sync_channel(1);
handle.sender.send(Msg::Get(0, ret_sender)).expect("workder died");
// недостатък - блокираме докато чакаме отговора
let GetResult(elem) = ret_receiver.recv().expect("worker died");
elem
};
println!("{:?}", elem);
}
Some(123)
use std::thread::{self, JoinHandle}; use std::sync::mpsc::{self, Sender, SyncSender, Receiver}; pub enum Msg { Push { data: u32 }, Pop, // в съобщението указваме адреса за връщане Get(usize, SyncSender), } pub struct GetResult(Option ); pub struct WorkerHandle { pub sender: Sender , pub thread: JoinHandle<()>, } impl WorkerHandle { pub fn spawn() -> WorkerHandle { let (sender, receiver) = mpsc::channel(); let worker_thread = thread::spawn(move || { let worker = Worker { data_vec: vec![] }; run(worker, receiver); }); WorkerHandle { sender, thread: worker_thread } } } fn run(mut worker: Worker, receiver: Receiver ) { for msg in receiver.iter() { match msg { Msg::Push { data } => { worker.data_vec.push(data); } Msg::Pop => { worker.data_vec.pop(); } Msg::Get(i, ret_sender) => { // `Option::cloned` - от `Option<&T>` към `Option ` // като клонира вътрешността let elem = worker.data_vec.get(i).cloned(); // Не можем да изпратим - явно получателя се е отказал. // Игнорираме грешката let _ = ret_sender.send(GetResult(elem)); } } } } struct Worker { data_vec: Vec , } fn main() { let handle = WorkerHandle::spawn(); handle.sender.send(Msg::Push { data: 123 }).expect("worker died"); // Забележка - това би могло да се скрие зад метод на `WorkerHandle` let elem = { let (ret_sender, ret_receiver) = mpsc::sync_channel(1); handle.sender.send(Msg::Get(0, ret_sender)).expect("workder died"); // недостатък - блокираме докато чакаме отговора let GetResult(elem) = ret_receiver.recv().expect("worker died"); elem }; println!("{:?}", elem); }
"Worker threads"
Двупосочна комуникация
Втори вариант е да държим канал в handle-a
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{self, Sender, Receiver};
pub enum Msg {
Push { data: u32 },
Pop,
Get(usize),
}
pub enum RetMsg {
GetResult(Option<u32>),
}
pub struct WorkerHandle {
pub sender: Sender<Msg>,
pub ret_receiver: Receiver<RetMsg>,
pub thread: JoinHandle<()>,
}
impl WorkerHandle {
pub fn spawn() -> WorkerHandle {
let (sender, receiver) = mpsc::channel();
let (ret_sender, ret_receiver) = mpsc::channel();
let worker_thread = thread::spawn(move || {
let worker = Worker { data_vec: vec![] };
run(worker, receiver, ret_sender);
});
WorkerHandle { sender, ret_receiver, thread: worker_thread }
}
}
fn run(mut worker: Worker, receiver: Receiver<Msg>, ret_sender: Sender<RetMsg>) {
for msg in receiver.iter() {
match msg {
Msg::Push { data } => {
worker.data_vec.push(data);
}
Msg::Pop => {
worker.data_vec.pop();
}
Msg::Get(i) => {
// `Option::cloned` - от `Option<&T>` към `Option<T>`
// като клонира вътрешността
let elem = worker.data_vec.get(i).cloned();
// Не можем да изпратим - явно получателя се е отказал.
// Игнорираме грешката
let _ = ret_sender.send(RetMsg::GetResult(elem));
}
}
}
}
struct Worker {
data_vec: Vec<u32>,
}
fn main() {
let handle = WorkerHandle::spawn();
handle.sender.send(Msg::Push { data: 123 }).expect("worker died");
handle.sender.send(Msg::Get(0)).expect("workder died");
// По-късно
let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died");
println!("{:?}", elem);
}
Some(123)
use std::thread::{self, JoinHandle}; use std::sync::mpsc::{self, Sender, Receiver}; pub enum Msg { Push { data: u32 }, Pop, Get(usize), } pub enum RetMsg { GetResult(Option), } pub struct WorkerHandle { pub sender: Sender , pub ret_receiver: Receiver , pub thread: JoinHandle<()>, } impl WorkerHandle { pub fn spawn() -> WorkerHandle { let (sender, receiver) = mpsc::channel(); let (ret_sender, ret_receiver) = mpsc::channel(); let worker_thread = thread::spawn(move || { let worker = Worker { data_vec: vec![] }; run(worker, receiver, ret_sender); }); WorkerHandle { sender, ret_receiver, thread: worker_thread } } } fn run(mut worker: Worker, receiver: Receiver , ret_sender: Sender ) { for msg in receiver.iter() { match msg { Msg::Push { data } => { worker.data_vec.push(data); } Msg::Pop => { worker.data_vec.pop(); } Msg::Get(i) => { // `Option::cloned` - от `Option<&T>` към `Option ` // като клонира вътрешността let elem = worker.data_vec.get(i).cloned(); // Не можем да изпратим - явно получателя се е отказал. // Игнорираме грешката let _ = ret_sender.send(RetMsg::GetResult(elem)); } } } } struct Worker { data_vec: Vec , } fn main() { let handle = WorkerHandle::spawn(); handle.sender.send(Msg::Push { data: 123 }).expect("worker died"); handle.sender.send(Msg::Get(0)).expect("workder died"); // По-късно let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died"); println!("{:?}", elem); }
"Worker threads"
Двупосочна комуникация
Недостатъка на втория подход е, имаме само един получател, защото Receiver<RetMsg>
не може да се клонира.
Ако искаме да изпращаме съобщения от няколко нишки и всяка нишка да има отделен канал за отговори, можем да го направим като запазваме тези канали в WorkerHandle
-а.
use std::thread;
use std::sync::mpsc::{self, Sender, Receiver};
pub enum Msg {
Push { data: u32 },
Pop,
Get(usize, Sender<RetMsg>),
}
pub enum RetMsg {
GetResult(Option<u32>),
}
pub struct WorkerHandle {
pub sender: Sender<Msg>,
pub ret_sender: Sender<RetMsg>,
pub ret_receiver: Receiver<RetMsg>,
// JoinHandle не може да се клонира.
// Бихме могли да си поиграем с API-то, но за сега
// ще го игнорираме и ще се фокусираме върху каналите
// pub thread: JoinHandle<()>,
}
impl Clone for WorkerHandle {
fn clone(&self) -> Self {
let (ret_sender, ret_receiver) = mpsc::channel();
WorkerHandle { sender: self.sender.clone(), ret_sender, ret_receiver }
}
}
impl WorkerHandle {
pub fn spawn() -> WorkerHandle {
let (sender, receiver) = mpsc::channel();
let _worker_thread = thread::spawn(move || {
let worker = Worker { data_vec: vec![] };
run(worker, receiver);
});
let (ret_sender, ret_receiver) = mpsc::channel();
WorkerHandle { sender, ret_sender, ret_receiver }
}
}
fn run(mut worker: Worker, receiver: Receiver<Msg>) {
for msg in receiver.iter() {
match msg {
Msg::Push { data } => {
worker.data_vec.push(data);
}
Msg::Pop => {
worker.data_vec.pop();
}
Msg::Get(i, ret_sender) => {
// `Option::cloned` - от `Option<&T>` към `Option<T>`
// като клонира вътрешността
let elem = worker.data_vec.get(i).cloned();
// Не можем да изпратим - явно получателя се е отказал.
// Игнорираме грешката
let _ = ret_sender.send(RetMsg::GetResult(elem));
}
}
}
}
struct Worker {
data_vec: Vec<u32>,
}
fn main() {
let handle = WorkerHandle::spawn();
handle.sender.send(Msg::Push { data: 123 }).expect("worker died");
handle.sender.send(Msg::Get(0, handle.ret_sender.clone())).expect("workder died");
// По-късно
let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died");
println!("{:?}", elem);
}
Some(123)
use std::thread; use std::sync::mpsc::{self, Sender, Receiver}; pub enum Msg { Push { data: u32 }, Pop, Get(usize, Sender), } pub enum RetMsg { GetResult(Option ), } pub struct WorkerHandle { pub sender: Sender , pub ret_sender: Sender , pub ret_receiver: Receiver , // JoinHandle не може да се клонира. // Бихме могли да си поиграем с API-то, но за сега // ще го игнорираме и ще се фокусираме върху каналите // pub thread: JoinHandle<()>, } impl Clone for WorkerHandle { fn clone(&self) -> Self { let (ret_sender, ret_receiver) = mpsc::channel(); WorkerHandle { sender: self.sender.clone(), ret_sender, ret_receiver } } } impl WorkerHandle { pub fn spawn() -> WorkerHandle { let (sender, receiver) = mpsc::channel(); let _worker_thread = thread::spawn(move || { let worker = Worker { data_vec: vec![] }; run(worker, receiver); }); let (ret_sender, ret_receiver) = mpsc::channel(); WorkerHandle { sender, ret_sender, ret_receiver } } } fn run(mut worker: Worker, receiver: Receiver ) { for msg in receiver.iter() { match msg { Msg::Push { data } => { worker.data_vec.push(data); } Msg::Pop => { worker.data_vec.pop(); } Msg::Get(i, ret_sender) => { // `Option::cloned` - от `Option<&T>` към `Option ` // като клонира вътрешността let elem = worker.data_vec.get(i).cloned(); // Не можем да изпратим - явно получателя се е отказал. // Игнорираме грешката let _ = ret_sender.send(RetMsg::GetResult(elem)); } } } } struct Worker { data_vec: Vec , } fn main() { let handle = WorkerHandle::spawn(); handle.sender.send(Msg::Push { data: 123 }).expect("worker died"); handle.sender.send(Msg::Get(0, handle.ret_sender.clone())).expect("workder died"); // По-късно let RetMsg::GetResult(elem) = handle.ret_receiver.recv().expect("worker died"); println!("{:?}", elem); }
"Worker threads"
Обобщение
- така описаните worker нишки имат множество недостатъци
- абстракцията става бавна и неудобна при повечко worker-и
- но при ограничен брой е удобен начин за организиране на кода
- повечето от недостатъците могат да се преодолеят със специални runtime-и и библиотеки
- тука се приближаваме към парадигмата за "актьори" (actor model)
- но за момента не мога да препоръчам rust библиотека, която имплементира това