Python 中并行处理的更好示例
Better examples of Parallel processing in Python
希望这次我没有被低估。我一直在努力处理 Python 中的并行处理一段时间(正好是 2 天)。我已经检查了这些资源(此处显示了部分列表:
(a) http://eli.thegreenplace.net/2013/01/16/python-paralellizing-cpu-bound-tasks-with-concurrent-futures
(b) https://pythonadventures.wordpress.com/tag/processpoolexecutor/
我解脱了。我想做的是:
大师:
Break up the file into chunks(strings or numbers)
Broadcast a pattern to be searched to all the workers
Receive the offsets in the file where the pattern was found
工人:
Receive pattern and chunk of text from the master
Compute()
Send back the offsets to the master.
我尝试使用 MPI/concurrent 来实现它。futures/multiprocessing 但失败了。
我使用多处理模块的简单实现
import multiprocessing
filename = "file1.txt"
pat = "afow"
N = 1000
""" This is the naive string search algorithm"""
def search(pat, txt):
patLen = len(pat)
txtLen = len(txt)
offsets = []
# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
for i in range ((txtLen - patLen) + 1):
# Can not use a for loop here
# For loops in C with && statements must be
# converted to while statements in python
counter = 0
while(counter < patLen) and pat[counter] == txt[counter + i]:
counter += 1
if counter >= patLen:
offsets.append(i)
return str(offsets).strip('[]')
""""
This is what I want
if __name__ == "__main__":
tasks = []
pool_outputs = []
pool = multiprocessing.Pool(processes=5)
with open(filename, 'r') as infile:
lines = []
for line in infile:
lines.append(line.rstrip())
if len(lines) > N:
pool_output = pool.map(search, tasks)
pool_outputs.append(pool_output)
lines = []
if len(lines) > 0:
pool_output = pool.map(search, tasks)
pool_outputs.append(pool_output)
pool.close()
pool.join()
print('Pool:', pool_outputs)
"""""
with open(filename, 'r') as infile:
for line in infile:
print(search(pat, line))
我将不胜感激任何指导,尤其是 concurrent.futures。谢谢你的时间。 Valeriy 帮助我完成了他的添加,我为此感谢他。
但是,如果有人可以让我稍等片刻,这就是我为 concurrent.futures(处理我在某处看到的一个示例)
编写的代码
from concurrent.futures import ProcessPoolExecutor, as_completed
import math
def search(pat, txt):
patLen = len(pat)
txtLen = len(txt)
offsets = []
# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
for i in range ((txtLen - patLen) + 1):
# Can not use a for loop here
# For loops in C with && statements must be
# converted to while statements in python
counter = 0
while(counter < patLen) and pat[counter] == txt[counter + i]:
counter += 1
if counter >= patLen:
offsets.append(i)
return str(offsets).strip('[]')
#Check a list of strings
def chunked_worker(lines):
return {0: search("fmo", line) for line in lines}
def pool_bruteforce(filename, nprocs):
lines = []
with open(filename) as f:
lines = [line.rstrip('\n') for line in f]
chunksize = int(math.ceil(len(lines) / float(nprocs)))
futures = []
with ProcessPoolExecutor() as executor:
for i in range(nprocs):
chunk = lines[(chunksize * i): (chunksize * (i + 1))]
futures.append(executor.submit(chunked_worker, chunk))
resultdict = {}
for f in as_completed(futures):
resultdict.update(f.result())
return resultdict
filename = "file1.txt"
pool_bruteforce(filename, 5)
再次感谢 Valeriy 和任何试图帮助我解决我的谜语的人。
您使用了多个参数,因此:
import multiprocessing
from functools import partial
filename = "file1.txt"
pat = "afow"
N = 1000
""" This is the naive string search algorithm"""
def search(pat, txt):
patLen = len(pat)
txtLen = len(txt)
offsets = []
# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
for i in range ((txtLen - patLen) + 1):
# Can not use a for loop here
# For loops in C with && statements must be
# converted to while statements in python
counter = 0
while(counter < patLen) and pat[counter] == txt[counter + i]:
counter += 1
if counter >= patLen:
offsets.append(i)
return str(offsets).strip('[]')
if __name__ == "__main__":
tasks = []
pool_outputs = []
pool = multiprocessing.Pool(processes=5)
lines = []
with open(filename, 'r') as infile:
for line in infile:
lines.append(line.rstrip())
tasks = lines
func = partial(search, pat)
if len(lines) > N:
pool_output = pool.map(func, lines )
pool_outputs.append(pool_output)
elif len(lines) > 0:
pool_output = pool.map(func, lines )
pool_outputs.append(pool_output)
pool.close()
pool.join()
print('Pool:', pool_outputs)
希望这次我没有被低估。我一直在努力处理 Python 中的并行处理一段时间(正好是 2 天)。我已经检查了这些资源(此处显示了部分列表:
(a) http://eli.thegreenplace.net/2013/01/16/python-paralellizing-cpu-bound-tasks-with-concurrent-futures
(b) https://pythonadventures.wordpress.com/tag/processpoolexecutor/
我解脱了。我想做的是:
大师:
Break up the file into chunks(strings or numbers)
Broadcast a pattern to be searched to all the workers
Receive the offsets in the file where the pattern was found
工人:
Receive pattern and chunk of text from the master
Compute()
Send back the offsets to the master.
我尝试使用 MPI/concurrent 来实现它。futures/multiprocessing 但失败了。
我使用多处理模块的简单实现
import multiprocessing
filename = "file1.txt"
pat = "afow"
N = 1000
""" This is the naive string search algorithm"""
def search(pat, txt):
patLen = len(pat)
txtLen = len(txt)
offsets = []
# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
for i in range ((txtLen - patLen) + 1):
# Can not use a for loop here
# For loops in C with && statements must be
# converted to while statements in python
counter = 0
while(counter < patLen) and pat[counter] == txt[counter + i]:
counter += 1
if counter >= patLen:
offsets.append(i)
return str(offsets).strip('[]')
""""
This is what I want
if __name__ == "__main__":
tasks = []
pool_outputs = []
pool = multiprocessing.Pool(processes=5)
with open(filename, 'r') as infile:
lines = []
for line in infile:
lines.append(line.rstrip())
if len(lines) > N:
pool_output = pool.map(search, tasks)
pool_outputs.append(pool_output)
lines = []
if len(lines) > 0:
pool_output = pool.map(search, tasks)
pool_outputs.append(pool_output)
pool.close()
pool.join()
print('Pool:', pool_outputs)
"""""
with open(filename, 'r') as infile:
for line in infile:
print(search(pat, line))
我将不胜感激任何指导,尤其是 concurrent.futures。谢谢你的时间。 Valeriy 帮助我完成了他的添加,我为此感谢他。
但是,如果有人可以让我稍等片刻,这就是我为 concurrent.futures(处理我在某处看到的一个示例)
编写的代码from concurrent.futures import ProcessPoolExecutor, as_completed
import math
def search(pat, txt):
patLen = len(pat)
txtLen = len(txt)
offsets = []
# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
for i in range ((txtLen - patLen) + 1):
# Can not use a for loop here
# For loops in C with && statements must be
# converted to while statements in python
counter = 0
while(counter < patLen) and pat[counter] == txt[counter + i]:
counter += 1
if counter >= patLen:
offsets.append(i)
return str(offsets).strip('[]')
#Check a list of strings
def chunked_worker(lines):
return {0: search("fmo", line) for line in lines}
def pool_bruteforce(filename, nprocs):
lines = []
with open(filename) as f:
lines = [line.rstrip('\n') for line in f]
chunksize = int(math.ceil(len(lines) / float(nprocs)))
futures = []
with ProcessPoolExecutor() as executor:
for i in range(nprocs):
chunk = lines[(chunksize * i): (chunksize * (i + 1))]
futures.append(executor.submit(chunked_worker, chunk))
resultdict = {}
for f in as_completed(futures):
resultdict.update(f.result())
return resultdict
filename = "file1.txt"
pool_bruteforce(filename, 5)
再次感谢 Valeriy 和任何试图帮助我解决我的谜语的人。
您使用了多个参数,因此:
import multiprocessing
from functools import partial
filename = "file1.txt"
pat = "afow"
N = 1000
""" This is the naive string search algorithm"""
def search(pat, txt):
patLen = len(pat)
txtLen = len(txt)
offsets = []
# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
for i in range ((txtLen - patLen) + 1):
# Can not use a for loop here
# For loops in C with && statements must be
# converted to while statements in python
counter = 0
while(counter < patLen) and pat[counter] == txt[counter + i]:
counter += 1
if counter >= patLen:
offsets.append(i)
return str(offsets).strip('[]')
if __name__ == "__main__":
tasks = []
pool_outputs = []
pool = multiprocessing.Pool(processes=5)
lines = []
with open(filename, 'r') as infile:
for line in infile:
lines.append(line.rstrip())
tasks = lines
func = partial(search, pat)
if len(lines) > N:
pool_output = pool.map(func, lines )
pool_outputs.append(pool_output)
elif len(lines) > 0:
pool_output = pool.map(func, lines )
pool_outputs.append(pool_output)
pool.close()
pool.join()
print('Pool:', pool_outputs)