我怎样才能并行化这段代码?
How can I parallelize this code?
我最初在 ruby 中写过这个,然后发现 MRI 根本不支持并行执行。所以我用 rust 重写了它,但我对各个部分的所有权有问题。
extern crate rand;
extern crate csv;
extern crate num_cpus;
use std::fs::File;
use csv::Reader;
use rand::{thread_rng, sample};
use std::thread;
use std::str::from_utf8;
use std::io::{self, Write};
use csv::index::{Indexed, create_index};
fn met_n_in_common(n:usize,csv:&mut Reader<File>)->usize{
csv.byte_records().map(|r| if(from_utf8(r.unwrap().get(n).unwrap()).unwrap() == "TRUE"){1}else{0}).fold(0usize, |sum, i| sum + i)
}
fn mets_in_common(csv:&mut Reader<File>,current_set_length:usize)->usize {
(0..csv.headers().unwrap().len()).map(|i| if(i == 0){0}else{met_n_in_common(i,csv)} ).filter(|&e| e==current_set_length ).count()
}
fn main() {
let csv_s = || csv::Reader::from_file("/Users/camdennarzt/Documents/All 7000 series-Table 1-1-1-3.csv").unwrap();
let mut csv = csv_s();
let mut index_data = io::Cursor::new(Vec::new());
create_index(csv_s(), index_data.by_ref()).unwrap();
let mut index = Indexed::open(csv_s(), index_data).unwrap();
let mut tried_indices = Vec::new();
let mut threads : Vec<_> = (0..num_cpus::get()).map(|i|{
thread::spawn(move || {
let mut best_set : Vec<Vec<String>> = Vec::new();
let mut best_count = 0;
let mut rng = thread_rng();
let mut indices = Vec::new();
let limit = 2usize.pow(10)/num_cpus::get();
for _ in (0..limit) {
while {
let count = *sample(&mut rng, 13..83, 1).first().unwrap();
indices = sample(&mut rng, 1..83, count);
tried_indices.contains(&indices)
}{}
tried_indices.push(indices.to_owned());
let current_set:Vec<_> = indices.iter().map(|&i|{
index.seek(i).unwrap();
index.records().next().unwrap().unwrap()
}).collect();
let current_count = mets_in_common(&mut csv,current_set.len());
if (current_count > best_count){
best_count = current_count;
best_set = current_set;
}
}
(best_count,best_set.iter().map(|r| *r.first().unwrap()).collect::<Vec<String>>())
})
}).collect();
}
特别是当我编译这个(rust 1.2 稳定版)时,我得到:
-*- mode: compilation; default-directory: "~/Developer/Rust/optimal_subset_finder/src/" -*-
Compilation started at Wed Aug 19 14:55:10
cargo build
Compiling optimal_subset_finder v0.1.0 (file:///Users/camdennarzt/Developer/Rust/optimal_subset_finder)
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31 thread::spawn(move || {
main.rs:32 let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33 let mut best_count = 0;
main.rs:34 let mut rng = thread_rng();
main.rs:35 let mut indices = Vec::new();
main.rs:36 let limit = 2usize.pow(10)/num_cpus::get();
...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31 thread::spawn(move || {
main.rs:32 let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33 let mut best_count = 0;
main.rs:34 let mut rng = thread_rng();
main.rs:35 let mut indices = Vec::new();
main.rs:36 let limit = 2usize.pow(10)/num_cpus::get();
...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31 thread::spawn(move || {
main.rs:32 let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33 let mut best_count = 0;
main.rs:34 let mut rng = thread_rng();
main.rs:35 let mut indices = Vec::new();
main.rs:36 let limit = 2usize.pow(10)/num_cpus::get();
...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:55:49: 55:68 error: cannot move out of borrowed content
main.rs:55 (best_count,best_set.iter().map(|r| *r.first().unwrap()).collect::<Vec<String>>())
^~~~~~~~~~~~~~~~~~~
note: in expansion of closure expansion
main.rs:55:45: 55:68 note: expansion site
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
error: aborting due to 4 previous errors
Could not compile `optimal_subset_finder`.
To learn more, run the command again with --verbose.
Compilation exited abnormally with code 101 at Wed Aug 19 14:55:10
如果我完全注释掉线程,那么它会编译并运行。但是我无法解析有关使线程正常工作的文档。在知道如何解决这个问题时,这些错误也不是特别有用。
你的代码有几个问题。
首先,您显示的实际错误是由于您试图从 &Vec<String>
中获取 String
。如果没有克隆,这是不可能的,因为它需要将 String
移出 &Vec<String>
,但您不能移出引用数据。由于您的 best_set
是在内线程中使用的,并且在您 return 来自它的数据之后立即被删除,您可以使用 into_iter()
:
安全地使用它
best_set.into_iter()
.flat_map(|r| r.take(1))
.collect::<Vec<String>>()
然而,这是最少的问题。你在并发部分有很多错误。
首先,您试图在多线程中直接使用 index
。这是做不到的,因为 index
驻留在父线程栈中。父线程可以在其子线程之前终止(这确实是您的程序编译后会发生的情况),并且 index
在这种情况下将被销毁,因此子线程将访问垃圾数据。为了解决这个问题,您需要使用 Arc
with some kind of synchronization, like Mutex
。如果没有互斥量,您将在没有同步的情况下从多个线程写入访问相同的数据,这是一个会导致未定义行为的数据竞争的完美示例。
let index = Arc::new(Mutex::new(Indexed::open(csv_s(), index_data).unwrap()));
...
let index = index.clone();
thread::spawn(move || {
...
let current_set:Vec<_> = {
let index = index.lock();
indices.iter().map(|&i| {
index.seek(i).unwrap();
index.records().next().unwrap().unwrap()
}).collect()
};
...
});
您需要对 tried_indices
做同样的事情 - 您将来自多个线程的数据推送到同一个向量中,因此您需要某种同步来安全地完成它。您应该小心范围,以免锁定它的时间超过必要时间 - 请记住,当 return 由 lock()
方法执行的守卫超出范围时,互斥锁将被释放。
我能看到的最后一个问题,也是最严重的一个,是您从所有生成的线程中使用相同的 csv
。虽然这不仅因为可变访问而导致数据混乱,但它也是错误的,因为 reader 根据定义是一个可耗尽的数据源。如果你从多个线程读取它,即使不考虑并发问题,你也会从它读取的任何部分的不同部分获得绝对随机的数据。因此,即使将 reader 放入互斥量也无法解决问题。
我认为最简单的解决方案是为每个线程创建一个单独的 reader。幸运的是,您已经有一个创建 readers 的函数,因此只需在生成线程之前使用它:
let mut csv = csv_s();
thread::spawn(move || {
...
});
最后,您似乎没有使用线程 return 的计算结果,但您可能已经知道了。
我最初在 ruby 中写过这个,然后发现 MRI 根本不支持并行执行。所以我用 rust 重写了它,但我对各个部分的所有权有问题。
extern crate rand;
extern crate csv;
extern crate num_cpus;
use std::fs::File;
use csv::Reader;
use rand::{thread_rng, sample};
use std::thread;
use std::str::from_utf8;
use std::io::{self, Write};
use csv::index::{Indexed, create_index};
fn met_n_in_common(n:usize,csv:&mut Reader<File>)->usize{
csv.byte_records().map(|r| if(from_utf8(r.unwrap().get(n).unwrap()).unwrap() == "TRUE"){1}else{0}).fold(0usize, |sum, i| sum + i)
}
fn mets_in_common(csv:&mut Reader<File>,current_set_length:usize)->usize {
(0..csv.headers().unwrap().len()).map(|i| if(i == 0){0}else{met_n_in_common(i,csv)} ).filter(|&e| e==current_set_length ).count()
}
fn main() {
let csv_s = || csv::Reader::from_file("/Users/camdennarzt/Documents/All 7000 series-Table 1-1-1-3.csv").unwrap();
let mut csv = csv_s();
let mut index_data = io::Cursor::new(Vec::new());
create_index(csv_s(), index_data.by_ref()).unwrap();
let mut index = Indexed::open(csv_s(), index_data).unwrap();
let mut tried_indices = Vec::new();
let mut threads : Vec<_> = (0..num_cpus::get()).map(|i|{
thread::spawn(move || {
let mut best_set : Vec<Vec<String>> = Vec::new();
let mut best_count = 0;
let mut rng = thread_rng();
let mut indices = Vec::new();
let limit = 2usize.pow(10)/num_cpus::get();
for _ in (0..limit) {
while {
let count = *sample(&mut rng, 13..83, 1).first().unwrap();
indices = sample(&mut rng, 1..83, count);
tried_indices.contains(&indices)
}{}
tried_indices.push(indices.to_owned());
let current_set:Vec<_> = indices.iter().map(|&i|{
index.seek(i).unwrap();
index.records().next().unwrap().unwrap()
}).collect();
let current_count = mets_in_common(&mut csv,current_set.len());
if (current_count > best_count){
best_count = current_count;
best_set = current_set;
}
}
(best_count,best_set.iter().map(|r| *r.first().unwrap()).collect::<Vec<String>>())
})
}).collect();
}
特别是当我编译这个(rust 1.2 稳定版)时,我得到:
-*- mode: compilation; default-directory: "~/Developer/Rust/optimal_subset_finder/src/" -*-
Compilation started at Wed Aug 19 14:55:10
cargo build
Compiling optimal_subset_finder v0.1.0 (file:///Users/camdennarzt/Developer/Rust/optimal_subset_finder)
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31 thread::spawn(move || {
main.rs:32 let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33 let mut best_count = 0;
main.rs:34 let mut rng = thread_rng();
main.rs:35 let mut indices = Vec::new();
main.rs:36 let limit = 2usize.pow(10)/num_cpus::get();
...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31 thread::spawn(move || {
main.rs:32 let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33 let mut best_count = 0;
main.rs:34 let mut rng = thread_rng();
main.rs:35 let mut indices = Vec::new();
main.rs:36 let limit = 2usize.pow(10)/num_cpus::get();
...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31 thread::spawn(move || {
main.rs:32 let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33 let mut best_count = 0;
main.rs:34 let mut rng = thread_rng();
main.rs:35 let mut indices = Vec::new();
main.rs:36 let limit = 2usize.pow(10)/num_cpus::get();
...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:55:49: 55:68 error: cannot move out of borrowed content
main.rs:55 (best_count,best_set.iter().map(|r| *r.first().unwrap()).collect::<Vec<String>>())
^~~~~~~~~~~~~~~~~~~
note: in expansion of closure expansion
main.rs:55:45: 55:68 note: expansion site
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
error: aborting due to 4 previous errors
Could not compile `optimal_subset_finder`.
To learn more, run the command again with --verbose.
Compilation exited abnormally with code 101 at Wed Aug 19 14:55:10
如果我完全注释掉线程,那么它会编译并运行。但是我无法解析有关使线程正常工作的文档。在知道如何解决这个问题时,这些错误也不是特别有用。
你的代码有几个问题。
首先,您显示的实际错误是由于您试图从 &Vec<String>
中获取 String
。如果没有克隆,这是不可能的,因为它需要将 String
移出 &Vec<String>
,但您不能移出引用数据。由于您的 best_set
是在内线程中使用的,并且在您 return 来自它的数据之后立即被删除,您可以使用 into_iter()
:
best_set.into_iter()
.flat_map(|r| r.take(1))
.collect::<Vec<String>>()
然而,这是最少的问题。你在并发部分有很多错误。
首先,您试图在多线程中直接使用 index
。这是做不到的,因为 index
驻留在父线程栈中。父线程可以在其子线程之前终止(这确实是您的程序编译后会发生的情况),并且 index
在这种情况下将被销毁,因此子线程将访问垃圾数据。为了解决这个问题,您需要使用 Arc
with some kind of synchronization, like Mutex
。如果没有互斥量,您将在没有同步的情况下从多个线程写入访问相同的数据,这是一个会导致未定义行为的数据竞争的完美示例。
let index = Arc::new(Mutex::new(Indexed::open(csv_s(), index_data).unwrap()));
...
let index = index.clone();
thread::spawn(move || {
...
let current_set:Vec<_> = {
let index = index.lock();
indices.iter().map(|&i| {
index.seek(i).unwrap();
index.records().next().unwrap().unwrap()
}).collect()
};
...
});
您需要对 tried_indices
做同样的事情 - 您将来自多个线程的数据推送到同一个向量中,因此您需要某种同步来安全地完成它。您应该小心范围,以免锁定它的时间超过必要时间 - 请记住,当 return 由 lock()
方法执行的守卫超出范围时,互斥锁将被释放。
我能看到的最后一个问题,也是最严重的一个,是您从所有生成的线程中使用相同的 csv
。虽然这不仅因为可变访问而导致数据混乱,但它也是错误的,因为 reader 根据定义是一个可耗尽的数据源。如果你从多个线程读取它,即使不考虑并发问题,你也会从它读取的任何部分的不同部分获得绝对随机的数据。因此,即使将 reader 放入互斥量也无法解决问题。
我认为最简单的解决方案是为每个线程创建一个单独的 reader。幸运的是,您已经有一个创建 readers 的函数,因此只需在生成线程之前使用它:
let mut csv = csv_s();
thread::spawn(move || {
...
});
最后,您似乎没有使用线程 return 的计算结果,但您可能已经知道了。