尝试 运行 进程时出现 pickle 错误
Getting a pickle error when trying to run processes
我想做的是运行一次在不同过程中分解素数的列表。我有一个正在运行的线程版本,但似乎无法让它与进程一起工作。
import math
from Queue import Queue
import multiprocessing
def primes2(n):
primfac = []
num = n
d = 2
while d * d <= n:
while (n % d) == 0:
primfac.append(d) # supposing you want multiple factors repeated
n //= d
d += 1
if n > 1:
primfac.append(n)
myfile = open('processresults.txt', 'a')
myfile.write(str(num) + ":" + str(primfac) + "\n")
return primfac
def mp_factorizer(nums, nprocs):
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outdict = {}
for n in nums:
outdict[n] = primes2(n)
out_q.put(outdict)
# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
resultdict.update(out_q.get())
# Wait for all worker processes to finish
for p in procs:
p.join()
print resultdict
if __name__ == '__main__':
mp_factorizer((400243534500, 100345345000, 600034522000, 9000045346435345000), 4)
我收到如下所示的 pickle 错误:
任何帮助将不胜感激:)
您需要使用 multiprocessing.Queue
而不是常规的 Queue
。 +more
这是因为进程没有运行使用相同的内存space并且有一些对象不是pickable,就像 常规队列 (Queue.Queue
)。为了克服这个问题,multiprocessing
库提供了一个 Queue
class,它实际上是一个 Proxy
到队列。
此外,您可以像任何其他方法一样提取 def worker(..
。这可能是您的主要问题,因为在 "how" 上,进程在 OS 级别上分叉。
您也可以使用 multiprocessing.Manager
+more.
动态创建的函数不能被 pickle,因此不能用作 Process
的目标,函数 worker
需要在全局范围内而不是在 [= 的定义中定义12=].
我想做的是运行一次在不同过程中分解素数的列表。我有一个正在运行的线程版本,但似乎无法让它与进程一起工作。
import math
from Queue import Queue
import multiprocessing
def primes2(n):
primfac = []
num = n
d = 2
while d * d <= n:
while (n % d) == 0:
primfac.append(d) # supposing you want multiple factors repeated
n //= d
d += 1
if n > 1:
primfac.append(n)
myfile = open('processresults.txt', 'a')
myfile.write(str(num) + ":" + str(primfac) + "\n")
return primfac
def mp_factorizer(nums, nprocs):
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outdict = {}
for n in nums:
outdict[n] = primes2(n)
out_q.put(outdict)
# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
resultdict.update(out_q.get())
# Wait for all worker processes to finish
for p in procs:
p.join()
print resultdict
if __name__ == '__main__':
mp_factorizer((400243534500, 100345345000, 600034522000, 9000045346435345000), 4)
我收到如下所示的 pickle 错误:
任何帮助将不胜感激:)
您需要使用 multiprocessing.Queue
而不是常规的 Queue
。 +more
这是因为进程没有运行使用相同的内存space并且有一些对象不是pickable,就像 常规队列 (Queue.Queue
)。为了克服这个问题,multiprocessing
库提供了一个 Queue
class,它实际上是一个 Proxy
到队列。
此外,您可以像任何其他方法一样提取 def worker(..
。这可能是您的主要问题,因为在 "how" 上,进程在 OS 级别上分叉。
您也可以使用 multiprocessing.Manager
+more.
动态创建的函数不能被 pickle,因此不能用作 Process
的目标,函数 worker
需要在全局范围内而不是在 [= 的定义中定义12=].