如何使用 Condvar 来限制多线程?
How do I use a Condvar to limit multithreading?
我正在尝试使用 Condvar
来限制在任何给定时间处于活动状态的线程数。我很难找到关于如何使用 Condvar
的好例子。到目前为止我有:
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let &(ref num, ref cvar) = &*thread_count;
{
let mut start = num.lock().unwrap();
if *start >= 20 {
cvar.wait(start);
}
*start += 1;
}
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
给出的编译器错误是:
error[E0382]: use of moved value: `start`
--> src/main.rs:16:18
|
14 | cvar.wait(start);
| ----- value moved here
15 | }
16 | *start += 1;
| ^^^^^ value used here after move
|
= note: move occurs because `start` has type `std::sync::MutexGuard<'_, i32>`, which does not implement the `Copy` trait
我完全不确定我对 Condvar
的使用是否正确。我尝试尽可能接近 Rust API 上的示例。 Wwat 是实现这个的正确方法吗?
这里是编译的版本:
use std::{
sync::{Arc, Condvar, Mutex},
thread,
};
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let (num, cvar) = &*thread_count;
let mut start = cvar
.wait_while(num.lock().unwrap(), |start| *start >= 20)
.unwrap();
// Before Rust 1.42, use this:
//
// let mut start = num.lock().unwrap();
// while *start >= 20 {
// start = cvar.wait(start).unwrap()
// }
*start += 1;
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
重要的部分可以从Condvar::wait_while
or Condvar::wait
的签名看出:
pub fn wait_while<'a, T, F>(
&self,
guard: MutexGuard<'a, T>,
condition: F
) -> LockResult<MutexGuard<'a, T>>
where
F: FnMut(&mut T) -> bool,
pub fn wait<'a, T>(
&self,
guard: MutexGuard<'a, T>
) -> LockResult<MutexGuard<'a, T>>
这表示 wait_while
/ wait
消耗 guard
,这就是为什么你会得到你所做的错误 - 你不再拥有start
,所以你不能在上面调用任何方法!
这些函数很好地反映了 Condvar
的工作方式 - 您暂时放弃对 Mutex
(由 start
表示)的锁定,然后函数 returns 你再次获得锁。
修复方法是放弃锁,然后从 wait_while
/ wait
中获取锁守卫 return 值。我也从 if
切换到 while
,如 .
我意识到我提供的代码并没有完全按照我的要求执行,所以我将此修改 放在这里以供将来参考。
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
let mut i = 0;
while i < 150 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let x;
let &(ref num, ref cvar) = &*thread_count;
{
let start = num.lock().unwrap();
let mut start = if *start >= 20 {
cvar.wait(start).unwrap()
} else {
start
};
*start += 1;
x = *start;
}
println!("{}", x);
{
let mut counter = num.lock().unwrap();
*counter -= 1;
}
cvar.notify_one();
});
i += 1;
}
println!("done");
}
运行 操场上的这个应该或多或少表现出预期的行为。
作为参考,在给定范围内限制线程数量的常用方法是使用 Semaphore
.
不幸的是,Semaphore
从未稳定下来,在 Rust 1.8 中被弃用并在 Rust 1.9 中被删除。有可用的板条箱在其他并发原语之上添加信号量。
let sema = Arc::new(Semaphore::new(20));
for i in 0..100 {
let sema = sema.clone();
thread::spawn(move || {
let _guard = sema.acquire();
println!("{}", i);
})
}
这不是完全相同的事情:因为当线程进入范围时,每个线程都不会打印范围内的线程总数。
您想使用 while
循环,并在每次迭代时重新分配 start
,例如:
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let &(ref num, ref cvar) = &*thread_count;
let mut start = num.lock().unwrap();
while *start >= 20 {
let current = cvar.wait(start).unwrap();
start = current;
}
*start += 1;
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
另请参阅有关该主题的一些文章:
https://medium.com/@polyglot_factotum/rust-concurrency-five-easy-pieces-871f1c62906a
https://medium.com/@polyglot_factotum/rust-concurrency-patterns-condvars-and-locks-e278f18db74f
我正在尝试使用 Condvar
来限制在任何给定时间处于活动状态的线程数。我很难找到关于如何使用 Condvar
的好例子。到目前为止我有:
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let &(ref num, ref cvar) = &*thread_count;
{
let mut start = num.lock().unwrap();
if *start >= 20 {
cvar.wait(start);
}
*start += 1;
}
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
给出的编译器错误是:
error[E0382]: use of moved value: `start`
--> src/main.rs:16:18
|
14 | cvar.wait(start);
| ----- value moved here
15 | }
16 | *start += 1;
| ^^^^^ value used here after move
|
= note: move occurs because `start` has type `std::sync::MutexGuard<'_, i32>`, which does not implement the `Copy` trait
我完全不确定我对 Condvar
的使用是否正确。我尝试尽可能接近 Rust API 上的示例。 Wwat 是实现这个的正确方法吗?
这里是编译的版本:
use std::{
sync::{Arc, Condvar, Mutex},
thread,
};
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let (num, cvar) = &*thread_count;
let mut start = cvar
.wait_while(num.lock().unwrap(), |start| *start >= 20)
.unwrap();
// Before Rust 1.42, use this:
//
// let mut start = num.lock().unwrap();
// while *start >= 20 {
// start = cvar.wait(start).unwrap()
// }
*start += 1;
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
重要的部分可以从Condvar::wait_while
or Condvar::wait
的签名看出:
pub fn wait_while<'a, T, F>(
&self,
guard: MutexGuard<'a, T>,
condition: F
) -> LockResult<MutexGuard<'a, T>>
where
F: FnMut(&mut T) -> bool,
pub fn wait<'a, T>(
&self,
guard: MutexGuard<'a, T>
) -> LockResult<MutexGuard<'a, T>>
这表示 wait_while
/ wait
消耗 guard
,这就是为什么你会得到你所做的错误 - 你不再拥有start
,所以你不能在上面调用任何方法!
这些函数很好地反映了 Condvar
的工作方式 - 您暂时放弃对 Mutex
(由 start
表示)的锁定,然后函数 returns 你再次获得锁。
修复方法是放弃锁,然后从 wait_while
/ wait
中获取锁守卫 return 值。我也从 if
切换到 while
,如
我意识到我提供的代码并没有完全按照我的要求执行,所以我将此修改
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
let mut i = 0;
while i < 150 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let x;
let &(ref num, ref cvar) = &*thread_count;
{
let start = num.lock().unwrap();
let mut start = if *start >= 20 {
cvar.wait(start).unwrap()
} else {
start
};
*start += 1;
x = *start;
}
println!("{}", x);
{
let mut counter = num.lock().unwrap();
*counter -= 1;
}
cvar.notify_one();
});
i += 1;
}
println!("done");
}
运行 操场上的这个应该或多或少表现出预期的行为。
作为参考,在给定范围内限制线程数量的常用方法是使用 Semaphore
.
不幸的是,Semaphore
从未稳定下来,在 Rust 1.8 中被弃用并在 Rust 1.9 中被删除。有可用的板条箱在其他并发原语之上添加信号量。
let sema = Arc::new(Semaphore::new(20));
for i in 0..100 {
let sema = sema.clone();
thread::spawn(move || {
let _guard = sema.acquire();
println!("{}", i);
})
}
这不是完全相同的事情:因为当线程进入范围时,每个线程都不会打印范围内的线程总数。
您想使用 while
循环,并在每次迭代时重新分配 start
,例如:
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let &(ref num, ref cvar) = &*thread_count;
let mut start = num.lock().unwrap();
while *start >= 20 {
let current = cvar.wait(start).unwrap();
start = current;
}
*start += 1;
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
另请参阅有关该主题的一些文章:
https://medium.com/@polyglot_factotum/rust-concurrency-five-easy-pieces-871f1c62906a
https://medium.com/@polyglot_factotum/rust-concurrency-patterns-condvars-and-locks-e278f18db74f