如何精细控制人造丝的调度?
How to finely control the scheduling of rayon?
在 this problem 中,我找到了填充 Python 的多处理池 (test4
) 的方法。然后我想起了 rayon 以迭代器的方式实现了并行性。所以我尝试在人造丝中实现相同的逻辑。这是代码:
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let now = Instant::now();
let table = Arc::new(Mutex::new(vec![b"| ".repeat(20); 8]));
let write_table = |msg: &[u8]| {
let mut table = table.lock().unwrap();
let thread_idx = rayon::current_thread_index().unwrap();
let time_idx = now.elapsed().as_secs() as usize * 2 + 1;
table[thread_idx][time_idx..time_idx + msg.len()].clone_from_slice(msg);
};
rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build_global()
.unwrap();
(0i32..10)
.into_par_iter()
.map(|_| {
write_table(b"b b b");
thread::sleep(Duration::from_secs(3));
(0i32..3).into_par_iter()
})
.flatten()
.map(|_| {
write_table(b"s");
thread::sleep(Duration::from_secs(1));
})
.collect::<Vec<_>>();
println!("##### total: {}s #####", now.elapsed().as_secs());
println!(
"{}",
table
.lock()
.unwrap()
.iter()
.map(|r| std::str::from_utf8(r).unwrap())
.collect::<Vec<_>>()
.join("\n")
);
}
但事实证明默认情况下人造丝比我想象的要懒惰。这是代码的输出:
##### total: 10s #####
|b b b|s|s|s|b b b|s| | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s| | | | | | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s|b b b|s| | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s| | | | | | | | | | | | | |
你可以看到一些大任务安排在一些小任务之后。结果,线程池没有得到充分利用。那么如何精细控制人造丝的调度才能充分利用呢?
更新
来自评论:
The procedure in Python's process pool is, when the imap is called, it starts executing on the background. If we call next
on the iterator, we will block until the result is returned. This requires an additional queue to store the results and the execution should be async (not sure). I was wondering if we can easily achieve this in Rayon.
折腾了一下,发现Rayon居然提供了一些方便的方法帮我实现,比如for_each_with
和par_bridge
。我终于得到了这个版本:Rust Playground。但它不稳定。有时它会得到更好的结果,如下所示:
##### total: 9s #####
|b b b|b b b|s|s| | | | | | | | | | | |
|b b b|s|s|s|s|s|s| | | | | | | | | | |
|b b b|s|s|s|s|s| | | | | | | | | | | |
|b b b|s|s|s|s|s| | | | | | | | | | | |
|b b b|s|s|s|s|s| | | | | | | | | | | |
|b b b|b b b|s|s|s| | | | | | | | | | |
| | | | | | |s|s| | | | | | | | | | | |
|b b b|b b b|s|s| | | | | | | | | | | |
有时情况会变得更糟。所以我想这可能是 Rayon 中的反模式?
Rayon 的调度策略被称为“工作窃取”。它的原理是任务指定它们可以运行并行的点;如果我们查看提供的接口 rayon::join()
,我们可以看到至少一种方法是指定两个函数,它们是并行 运行ning 的候选函数。默认情况下,这两个闭包顺序为 运行。但是,如果 Rayon 线程池中的线程没有任何工作要做,它会查看 pair 对中的 second 闭包,并将其“窃取”到 运行 在那个免费线程上。
这种策略有很多优点,主要在于线程之间没有通信开销(以及将要处理的数据移动到另一个内核),除非这会启用额外的并行性。
在您的特定用例中,这恰好会产生较差的利用率模式,因为每个 b
任务会分散到许多 s
任务中,因此最好完成所有 b
尽快完成任务,但 rayon
更喜欢在与他们的 b
任务相同的线程上完成所有 s
任务 ,除非有空闲线程 ,那里不是。
不幸的是,我认为不可能让 Rayon 的并行迭代为您的用例很好地执行调度。你的情况受益于尽可能多地做 b
task-starting 而忽略它之后的潜在 sequentially-runnable s
activity ,这与人造丝相反假设是可取的。
但是,我们可以跳出迭代器接口并显式生成并行任务,然后通过通道将它们的输出提供给并行桥。这样,b
任务就不会被视为在其父 s
任务之后按顺序执行的候选对象。
let (b_out, s_in) = crossbeam::channel::unbounded();
let mut final_output: Vec<()> = vec![];
rayon::scope(|scope| {
for _ in 0i32..10 {
let b_out = b_out.clone();
scope.spawn(move |_| {
write_table(b"b b b");
thread::sleep(Duration::from_secs(3));
b_out.send((0i32..3).into_par_iter()).unwrap();
});
}
drop(b_out); // needed to ensure the channel is closed when it has no more items
final_output = s_in
.into_iter()
.par_bridge()
.flatten()
.map(|_| {
write_table(b"s");
thread::sleep(Duration::from_secs(1));
})
.collect();
});
这会浪费其中一个线程,可能是因为它大部分时间都在 s_in.into_iter().next()
上阻塞。这可以通过
来避免
- 对
s
任务也使用 spawn() 而不是通道,假设您实际上不需要收集任何输出(或者可以为这些输出使用通道),或者
- 创建一个带有 1 个额外线程的线程池(它将花费大部分时间阻塞而不是竞争 CPU 时间)。
但是,它仍然比您开始时的计划要快(9 秒而不是 10 秒)。
如果您实际上没有任何工作可以从比您所展示的进一步细分中获益,您可能需要考虑完全避免 Rayon 并 运行建立您自己的线程池。 Rayon 是一个很棒的通用工具,但如果您的工作负载具有已知且简单的形状,您应该能够使用自定义代码超越它。
在 this problem 中,我找到了填充 Python 的多处理池 (test4
) 的方法。然后我想起了 rayon 以迭代器的方式实现了并行性。所以我尝试在人造丝中实现相同的逻辑。这是代码:
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let now = Instant::now();
let table = Arc::new(Mutex::new(vec![b"| ".repeat(20); 8]));
let write_table = |msg: &[u8]| {
let mut table = table.lock().unwrap();
let thread_idx = rayon::current_thread_index().unwrap();
let time_idx = now.elapsed().as_secs() as usize * 2 + 1;
table[thread_idx][time_idx..time_idx + msg.len()].clone_from_slice(msg);
};
rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build_global()
.unwrap();
(0i32..10)
.into_par_iter()
.map(|_| {
write_table(b"b b b");
thread::sleep(Duration::from_secs(3));
(0i32..3).into_par_iter()
})
.flatten()
.map(|_| {
write_table(b"s");
thread::sleep(Duration::from_secs(1));
})
.collect::<Vec<_>>();
println!("##### total: {}s #####", now.elapsed().as_secs());
println!(
"{}",
table
.lock()
.unwrap()
.iter()
.map(|r| std::str::from_utf8(r).unwrap())
.collect::<Vec<_>>()
.join("\n")
);
}
但事实证明默认情况下人造丝比我想象的要懒惰。这是代码的输出:
##### total: 10s #####
|b b b|s|s|s|b b b|s| | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s| | | | | | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s|b b b|s| | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s| | | |s| | | | | | | | | |
|b b b|s|s|s| | | | | | | | | | | | | |
你可以看到一些大任务安排在一些小任务之后。结果,线程池没有得到充分利用。那么如何精细控制人造丝的调度才能充分利用呢?
更新
来自评论:
The procedure in Python's process pool is, when the imap is called, it starts executing on the background. If we call
next
on the iterator, we will block until the result is returned. This requires an additional queue to store the results and the execution should be async (not sure). I was wondering if we can easily achieve this in Rayon.
折腾了一下,发现Rayon居然提供了一些方便的方法帮我实现,比如for_each_with
和par_bridge
。我终于得到了这个版本:Rust Playground。但它不稳定。有时它会得到更好的结果,如下所示:
##### total: 9s #####
|b b b|b b b|s|s| | | | | | | | | | | |
|b b b|s|s|s|s|s|s| | | | | | | | | | |
|b b b|s|s|s|s|s| | | | | | | | | | | |
|b b b|s|s|s|s|s| | | | | | | | | | | |
|b b b|s|s|s|s|s| | | | | | | | | | | |
|b b b|b b b|s|s|s| | | | | | | | | | |
| | | | | | |s|s| | | | | | | | | | | |
|b b b|b b b|s|s| | | | | | | | | | | |
有时情况会变得更糟。所以我想这可能是 Rayon 中的反模式?
Rayon 的调度策略被称为“工作窃取”。它的原理是任务指定它们可以运行并行的点;如果我们查看提供的接口 rayon::join()
,我们可以看到至少一种方法是指定两个函数,它们是并行 运行ning 的候选函数。默认情况下,这两个闭包顺序为 运行。但是,如果 Rayon 线程池中的线程没有任何工作要做,它会查看 pair 对中的 second 闭包,并将其“窃取”到 运行 在那个免费线程上。
这种策略有很多优点,主要在于线程之间没有通信开销(以及将要处理的数据移动到另一个内核),除非这会启用额外的并行性。
在您的特定用例中,这恰好会产生较差的利用率模式,因为每个 b
任务会分散到许多 s
任务中,因此最好完成所有 b
尽快完成任务,但 rayon
更喜欢在与他们的 b
任务相同的线程上完成所有 s
任务 ,除非有空闲线程 ,那里不是。
不幸的是,我认为不可能让 Rayon 的并行迭代为您的用例很好地执行调度。你的情况受益于尽可能多地做 b
task-starting 而忽略它之后的潜在 sequentially-runnable s
activity ,这与人造丝相反假设是可取的。
但是,我们可以跳出迭代器接口并显式生成并行任务,然后通过通道将它们的输出提供给并行桥。这样,b
任务就不会被视为在其父 s
任务之后按顺序执行的候选对象。
let (b_out, s_in) = crossbeam::channel::unbounded();
let mut final_output: Vec<()> = vec![];
rayon::scope(|scope| {
for _ in 0i32..10 {
let b_out = b_out.clone();
scope.spawn(move |_| {
write_table(b"b b b");
thread::sleep(Duration::from_secs(3));
b_out.send((0i32..3).into_par_iter()).unwrap();
});
}
drop(b_out); // needed to ensure the channel is closed when it has no more items
final_output = s_in
.into_iter()
.par_bridge()
.flatten()
.map(|_| {
write_table(b"s");
thread::sleep(Duration::from_secs(1));
})
.collect();
});
这会浪费其中一个线程,可能是因为它大部分时间都在 s_in.into_iter().next()
上阻塞。这可以通过
- 对
s
任务也使用 spawn() 而不是通道,假设您实际上不需要收集任何输出(或者可以为这些输出使用通道),或者 - 创建一个带有 1 个额外线程的线程池(它将花费大部分时间阻塞而不是竞争 CPU 时间)。
但是,它仍然比您开始时的计划要快(9 秒而不是 10 秒)。
如果您实际上没有任何工作可以从比您所展示的进一步细分中获益,您可能需要考虑完全避免 Rayon 并 运行建立您自己的线程池。 Rayon 是一个很棒的通用工具,但如果您的工作负载具有已知且简单的形状,您应该能够使用自定义代码超越它。