Python - 多处理意外结果
Python - multiprocessing unexpected results
我有一些包含迭代器的代码,效果很好:
import multiprocessing
m = [0,1,2,3]
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def gen(t):
return t.next()
print gen(tt)
print gen(tt)
print gen(tt)
输出:
0
1
2
但是如果我尝试将它插入到并行进程中,我不会得到预期的结果:
import time
import multiprocessing
m = [0,1,2,3]
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def gen(t):
return t.next()
job1 = multiprocessing.Process(target=gen, args=(tt,))
print job1.start()
job2 = multiprocessing.Process(target=gen, args=(tt,))
print job2.start()
job3 = multiprocessing.Process(target=gen, args=(tt,))
print job3.start()
输出:
<None)>
<None)>
<None)>
我想不通,我怎么能通过并行使用这个迭代器。
有谁能够帮助我?
谢谢!
更新:
在@Anand S Kumar 非常有用的帮助下,我更新了我的代码,它工作正常,只是输出不明确,目前我正在尝试找出问题所在,也许它会受到另一个线程,也许 Anand 会帮助我 :)):
from threading import Thread, Lock
import time
m = [0,1,2,3]
starter = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def f(t):
global starter
lock = Lock()
lock.acquire()
try:
starter = t.next()
finally:
lock.release()
t1 = Thread(target=f,args=(tt,))
t1.start()
t2 = Thread(target=f,args=(tt,))
t2.start()
t3 = Thread(target=f,args=(tt,))
t3.start()
t1.join()
print starter
t2.join()
print starter
t3.join()
print starter
不同的输出,相同的代码:
0
1
2
2
2
2
0
2
2
您正在尝试打印 job.start()
函数的 return 值,它没有 return 任何内容,因此它打印 None
.
不是打印 job.start()
的 return 值,也许您可以将打印语句移动到 gen(t)
函数中,例如 -
def gen(t):
print t.next()
然后是 运行 程序,不打印 job.start()
.
如果要从函数接收 return 值,可以使用多处理模块中的 Pool
。 [Documentation]
文档中的示例 -
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10))
但请注意,您实际上是在创建多个进程,而不是线程,它们不会共享全局变量。
我相信您想要的是 threads
,也许像下面这样的示例可以帮助您入门 -
from threading import Thread, Lock
m = [0,1,2,3]
starter = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def f(t):
global starter
lock = Lock()
lock.acquire()
try:
starter = t.next()
finally:
lock.release()
t1 = Thread(target=f,args=(tt,))
t1.start()
t2 = Thread(target=f,args=(tt,))
t2.start()
t1.join()
t2.join()
两个问题:
1) start()
函数没有 return 值,因此您将 None
打印出来。
2) 您将生成器对象传递给每个进程,从而 复制 原始 gener
对象(在主进程中声明)三次,一次到每个分叉进程的堆栈。因此,即使您将函数更改为:
def gen(t):
print t.next()
您要做的就是在每个单独的 gener
对象上第一次也是唯一一次调用 next()
,打印:
0
0
0
为了获得预期的效果,您需要在主进程中执行迭代,将其结果传递给每个派生进程:
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job1.start()
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job2.start()
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job3.start()
那么你的 gen
函数需要做的就是 print
值:
def gen(t):
print t
你得到:
0
1
2
我有一些包含迭代器的代码,效果很好:
import multiprocessing
m = [0,1,2,3]
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def gen(t):
return t.next()
print gen(tt)
print gen(tt)
print gen(tt)
输出:
0 1 2
但是如果我尝试将它插入到并行进程中,我不会得到预期的结果:
import time
import multiprocessing
m = [0,1,2,3]
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def gen(t):
return t.next()
job1 = multiprocessing.Process(target=gen, args=(tt,))
print job1.start()
job2 = multiprocessing.Process(target=gen, args=(tt,))
print job2.start()
job3 = multiprocessing.Process(target=gen, args=(tt,))
print job3.start()
输出:
<None)> <None)> <None)>
我想不通,我怎么能通过并行使用这个迭代器。 有谁能够帮助我? 谢谢!
更新:
在@Anand S Kumar 非常有用的帮助下,我更新了我的代码,它工作正常,只是输出不明确,目前我正在尝试找出问题所在,也许它会受到另一个线程,也许 Anand 会帮助我 :)):
from threading import Thread, Lock
import time
m = [0,1,2,3]
starter = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def f(t):
global starter
lock = Lock()
lock.acquire()
try:
starter = t.next()
finally:
lock.release()
t1 = Thread(target=f,args=(tt,))
t1.start()
t2 = Thread(target=f,args=(tt,))
t2.start()
t3 = Thread(target=f,args=(tt,))
t3.start()
t1.join()
print starter
t2.join()
print starter
t3.join()
print starter
不同的输出,相同的代码:
0 1 2 2 2 2 0 2 2
您正在尝试打印 job.start()
函数的 return 值,它没有 return 任何内容,因此它打印 None
.
不是打印 job.start()
的 return 值,也许您可以将打印语句移动到 gen(t)
函数中,例如 -
def gen(t):
print t.next()
然后是 运行 程序,不打印 job.start()
.
如果要从函数接收 return 值,可以使用多处理模块中的 Pool
。 [Documentation]
文档中的示例 -
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10))
但请注意,您实际上是在创建多个进程,而不是线程,它们不会共享全局变量。
我相信您想要的是 threads
,也许像下面这样的示例可以帮助您入门 -
from threading import Thread, Lock
m = [0,1,2,3]
starter = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def f(t):
global starter
lock = Lock()
lock.acquire()
try:
starter = t.next()
finally:
lock.release()
t1 = Thread(target=f,args=(tt,))
t1.start()
t2 = Thread(target=f,args=(tt,))
t2.start()
t1.join()
t2.join()
两个问题:
1) start()
函数没有 return 值,因此您将 None
打印出来。
2) 您将生成器对象传递给每个进程,从而 复制 原始 gener
对象(在主进程中声明)三次,一次到每个分叉进程的堆栈。因此,即使您将函数更改为:
def gen(t):
print t.next()
您要做的就是在每个单独的 gener
对象上第一次也是唯一一次调用 next()
,打印:
0
0
0
为了获得预期的效果,您需要在主进程中执行迭代,将其结果传递给每个派生进程:
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job1.start()
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job2.start()
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job3.start()
那么你的 gen
函数需要做的就是 print
值:
def gen(t):
print t
你得到:
0
1
2