Rayon

01 декември 2022

Rayon

Въведение

Rayon

Въведение

Rayon

Въведение

Rayon

Въведение

Rayon

Как да го ползваме?

Rayon

Как да го ползваме?

Rayon

Как да го ползваме?

Rayon

Как да го ползваме?

Rayon

Как да го ползваме?

Rayon

Как да го ползваме?

Rayon

Rayon пример

1 2 3 4 5 6 7 8
use rayon::prelude::*;

fn main() {
    (1..10).into_par_iter()
        .for_each(|p| {
            println!("Executing {:}", p);
        });
}
Executing 1 Executing 4 Executing 7 Executing 6 Executing 8 Executing 9 Executing 5 Executing 3 Executing 2
extern crate rayon;
use rayon::prelude::*;

fn main() {
    (1..10).into_par_iter()
        .for_each(|p| {
            println!("Executing {:}", p);
        });
}

Rayon

Rayon пример с mutability

1 2 3 4 5 6 7
use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}
[-1, 6, 8, 10]
extern crate rayon;
use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}

Rayon

Rayon пример с each и any

1 2 3 4 5 6 7 8 9 10
use rayon::prelude::*;

fn main() {
    let vec = vec![2, 4, 6, 8];

    assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(!vec.par_iter().any(|n| *n > 8 ));
    assert!(vec.par_iter().all(|n| *n <= 8 ));
}
extern crate rayon;
use rayon::prelude::*;

fn main() {
    let vec = vec![2, 4, 6, 8];

    assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(!vec.par_iter().any(|n| *n > 8 ));
    assert!(vec.par_iter().all(|n| *n <= 8 ));
}

Rayon

За да ползвате паралелните итератори в Rayon

Обикновено всичко което трябва да направите е:

Rayon

За да ползвате паралелните итератори в Rayon

Обикновено всичко което трябва да направите е:

Rayon

За да ползвате паралелните итератори в Rayon

Обикновено всичко което трябва да направите е:

Rayon

За да ползвате паралелните итератори в Rayon

Обикновено всичко което трябва да направите е:

Rayon

За да ползвате паралелните итератори в Rayon

Обикновено всичко което трябва да направите е:

Rayon

За да ползвате паралелните итератори в Rayon

Обикновено всичко което трябва да направите е:

Rayon

Да разгледаме типичната грешка

1 2 3 4 5 6 7 8 9
use rayon::prelude::*;

fn main() {
    let mut total = 0;
    (1..10).into_par_iter()
        .for_each(|p| {
            total += p;
        });
}
error[E0594]: cannot assign to `total`, as it is a captured variable in a `Fn` closure --> src/bin/main_530cd1344e9a9c93a61a57075d2aed005ad3578e.rs:8:13 | 8 | total += p; | ^^^^^^^^^^ cannot assign For more information about this error, try `rustc --explain E0594`. error: could not compile `rust` due to previous error
extern crate rayon;
use rayon::prelude::*;

fn main() {
    let mut total = 0;
    (1..10).into_par_iter()
        .for_each(|p| {
            total += p;
        });
}

Rayon

Как е дефиниран паралелния итератор

1 2 3 4 5 6 7
pub trait ParallelIterator: Sized + Send {
    type Item: Send;
    //...
    fn for_each<OP>(self, op: OP)
        where OP: Fn(Self::Item) + Sync + Send,
    //...
}
1

Rayon

Да разгледаме типичната грешка - едно решение

1 2 3 4 5 6 7 8 9 10
use rayon::prelude::*;
use std::sync::atomic::{AtomicI32, Ordering};

fn main() {
    let mut total = AtomicI32::new(0);
    (1..10).into_par_iter()
        .for_each(|p| {
            total.fetch_add(p, Ordering::SeqCst);
        });
}
extern crate rayon;
use rayon::prelude::*;
use std::sync::atomic::{AtomicI32, Ordering};

fn main() {
    let mut total = AtomicI32::new(0);
    (1..10).into_par_iter()
        .for_each(|p| {
            total.fetch_add(p, Ordering::SeqCst);
        });
}

Rayon

Решение чрез fold

1 2 3 4 5 6 7 8 9 10
use rayon::prelude::*;

fn main() {
    let numbers = 0..=100;
    let sum = numbers.into_par_iter()
        .fold(|| 0, |a, b| a + b)
        .sum::<i32>();

    assert_eq!(sum, 5050);
}
extern crate rayon;
use rayon::prelude::*;

fn main() {
    let numbers = 0..=100;
    let sum = numbers.into_par_iter()
        .fold(|| 0, |a, b| a + b)
        .sum::();

    assert_eq!(sum, 5050);
}

Rayon

Решение чрез reduce

Rayon

Решение чрез reduce

1 2 3 4 5 6 7 8 9
use rayon::prelude::*;

fn main() {
    let numbers = 0..=100;
    let sum = numbers.into_par_iter()
        .reduce(|| 0, |a, b| (a + b));

    assert_eq!(sum, 5050);
}
extern crate rayon;
use rayon::prelude::*;

fn main() {
    let numbers = 0..=100;
    let sum = numbers.into_par_iter()
        .reduce(|| 0, |a, b| (a + b));

    assert_eq!(sum, 5050);
}

Rayon

Решение чрез try_reduce с проверка за overflow

1 2 3 4 5 6 7 8 9 10
// Compute the sum of squares, being careful about overflow.
fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> {
    iter
        .into_par_iter()
        .map(|i| i.checked_mul(i))            // square each item,
        .try_reduce(|| 0, i32::checked_add)   // and add them up!
}
assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16));
assert_eq!(sum_squares(0..10_000), None);
assert_eq!(sum_squares(1_000_000..1_000_001), None);

Rayon

Паралелно сортиране

Rayon

Паралелно сортиране

1 2 3 4 5 6 7 8 9 10
let mut v:[i32; 5] = [-5, 4, 1, -3, 2];

v.par_sort();
assert_eq!(v, [-5, -3, 1, 2, 4]);

v.par_sort_by(|a, b| b.cmp(a));
assert_eq!(v, [4, 2, 1, -3, -5]);

v.par_sort_by_key(|k| k.abs());
assert_eq!(v, [1, 2, -3, 4, -5]);

Rayon

Паралелните операции трябва да се ползват разумно

Rayon

Паралелните операции трябва да се ползват разумно

1 2
let chunks: Vec<_> = [1, 2, 3, 4, 5, 6].par_chunks(3).collect();
assert_eq!(chunks, vec![&[1, 2, 3], &[4, 5, 6]]);

Rayon

Едно сравнение

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
fn calc_something(i: i32) -> i32 { i & (i >> 1) }

#[inline(always)]
fn method_rayon_no_chunks(contents: &[i32]) -> i32 {
    contents.
        par_iter().
        map(|i| calc_something(*i)).
        sum()
}

#[inline(always)]
fn method_rayon_chunks(contents: &[i32]) -> i32 {
    contents.
        par_chunks(12).
        map(|chunk| {method_rayon_no_chunks(chunk)}).
        sum()
}

Rayon

Създаване на паралелен итератор чрез par_bridge

Rayon

Създаване на паралелен итератор чрез par_bridge

Rayon

Създаване на паралелен итератор чрез par_bridge

Rayon

Създаване на паралелен итератор чрез par_bridge

Rayon

Създаване на паралелен итератор чрез par_bridge

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;

fn main() {
    let rx = {
        let (tx, rx) = channel();
        tx.send("one!").unwrap();
        tx.send("two!").unwrap();
        tx.send("three!").unwrap();
        rx
    };
    let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
    output.sort();
    assert_eq!(&*output, &["one!", "three!", "two!"]);
}
extern crate rayon;
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;

fn main() {
    let rx = {
        let (tx, rx) = channel();
        tx.send("one!").unwrap();
        tx.send("two!").unwrap();
        tx.send("three!").unwrap();
        rx
    };
    let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
    output.sort();
    assert_eq!(&*output, &["one!", "three!", "two!"]);
}

Rayon

rayon::join

Rayon

rayon::join

Rayon

rayon::join

Rayon

rayon::join

1 2 3 4 5 6 7 8 9 10 11
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
    if v.len() > 1 {
        let mid = partition(v);
        let (lo, hi) = v.split_at_mut(mid);
        rayon::join(|| quick_sort(lo), || quick_sort(hi));
    }
}

fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    // see https://en.wikipedia.org/wiki/Quicksort#Lomuto_partition_scheme
}

Rayon

rayon::join - Наивна имплементация на фибоначи

1 2 3 4 5 6 7
fn fib(n: u32) -> u32 {
    if n < 2 {
        return n;
    }
    let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2));
    a + b
}

Rayon

ThreadPool-s

Rayon

ThreadPool-s

1 2 3 4 5 6 7
let size = 1024 * 1024;
    let number_of_threads = 2;
    rayon::ThreadPoolBuilder::new()
        .stack_size(size)
        .num_threads(number_of_threads)
        .build_global()
        .unwrap();
extern crate rayon;
fn main() {

    let size = 1024 * 1024;
    let number_of_threads = 2;
    rayon::ThreadPoolBuilder::new()
        .stack_size(size)
        .num_threads(number_of_threads)
        .build_global()
        .unwrap();
}

Rayon

ThreadPool-s

Rayon

ThreadPool-s

1 2 3 4 5 6 7 8
use std::thread;
    let y = 1;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
use std::thread;
fn main() {
    let y = 1;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id())); 
}

Rayon

ThreadPool-s

1 2 3 4 5 6 7 8
use std::thread;
    let y = 1;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
use std::thread;
fn main() {
    let y = 1;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id()));
    pool.spawn(|| println!("Task executes on thread: {:?}", thread::current().id())); 
}

Rayon

scope и spawn

Rayon

scope и spawn

Rayon

scope и spawn

Rayon

scope и spawn

Rayon

scope и spawn

1 2 3 4 5
pool.scope(move |s| {
    s.spawn(move |s| { 
        println!("Task executes on thread: {} {:?}", y, thread::current().id()) 
    })
});

Rayon

Ресурси

Въпроси