关于在 Python 中的一些线程之间共享变量
On the sharing a variable between some threads in Python
作为创建更复杂代码的 MWE 的尝试,我有一个函数 do()
,它应该由两个线程 运行。共享变量 temp
应该由两个线程使用队列模块带来的来操作。这是代码:
import random
from threading import Thread
from queue import Queue
epochs = 2
success = [0 for x in range(epochs)]
def do(temp,q):
temp = q.get()
if random.random() > 0.5:
temp += 1
print("Teh current temp is {0}".format(temp))
return q.put(temp)
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
q = Queue()
Thread(target=do, args=(temp,q)).start()
Thread(target=do, args=(temp,q)).start()
success[epoch] = temp
然而输出只是
iteration: 0
iteration: 1
显然,即使 do()
的 print
推荐也没有返回。
有人可以解决我在这里做错了什么吗?
因为threads
使用相同的内存,所以你可以像在普通程序中一样使用全局变量
import random
from threading import Thread
epochs = 2
success = [0] * epochs
def do():
global temp
if random.random() > 0.5:
print(' add')
temp += 1
print("Teh current temp is {0}".format(temp))
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
t1 = Thread(target=do)
t2 = Thread(target=do)
t1.start()
t2.start()
t1.join()
t2.join()
success[epoch] = temp
您最终可以在访问共享变量时使用 Lock()
来阻塞其他线程。最后你可以发送 lock
作为参数。但是因为Python不能运行同时两个线程所以不知道是不是真的需要
import random
from threading import Thread
from threading import Lock
epochs = 2
success = [0] * epochs
def do(lock):
global temp
if random.random() > 0.5:
print(' add')
lock.acquire() # block other threads
temp += 1
lock.release() # unblock other threads
print("Teh current temp is {0}".format(temp))
lock = Lock()
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
t1 = Thread(target=do, args=(lock,))
t2 = Thread(target=do, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
success[epoch] = temp
如果你只需要添加一些值而不显示那么我宁愿使用 queue
将 0
或 1
发送到主线程并将其添加到 temp
在主线程中。
import random
from threading import Thread
from queue import Queue
epochs = 2
success = [0] * epochs
def do(q):
if random.random() > 0.5:
q.put(1)
else:
q.put(0)
q = Queue()
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
t1 = Thread(target=do, args=(q,))
t2 = Thread(target=do, args=(q,))
t1.start()
t2.start()
temp += q.get()
temp += q.get()
t1.join()
t2.join()
success[epoch] = temp
print(temp)
并且此方法也适用于 multiprocessking
、ray
、joblib
等
编辑:
最新版本ray
。我在 epoch
中使用更大的 epochs 和更多的进程
import random
import ray
ray.init()
@ray.remote
def do():
if random.random() > 0.5:
print('do: 1')
return 1
else:
print('do: 0')
return 0
epochs = 5
success = [0] * epochs
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
results = ray.get([do.remote() for _ in range(5)])
temp = sum(results)
success[epoch] = temp
print('Temp:', temp)
最新版本 joblib
import random
from joblib import Parallel, delayed
def do():
if random.random() > 0.5:
print('do: 1')
return 1
else:
print('do: 0')
return 0
epochs = 5
success = [0] * epochs
pool = Parallel(n_jobs=3)
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
#results = Parallel(n_jobs=3)(delayed(do)() for _ in range(5))
results = pool(delayed(do)() for _ in range(5))
temp = sum(results)
success[epoch] = temp
print('Temp:', temp)
作为创建更复杂代码的 MWE 的尝试,我有一个函数 do()
,它应该由两个线程 运行。共享变量 temp
应该由两个线程使用队列模块带来的来操作。这是代码:
import random
from threading import Thread
from queue import Queue
epochs = 2
success = [0 for x in range(epochs)]
def do(temp,q):
temp = q.get()
if random.random() > 0.5:
temp += 1
print("Teh current temp is {0}".format(temp))
return q.put(temp)
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
q = Queue()
Thread(target=do, args=(temp,q)).start()
Thread(target=do, args=(temp,q)).start()
success[epoch] = temp
然而输出只是
iteration: 0
iteration: 1
显然,即使 do()
的 print
推荐也没有返回。
有人可以解决我在这里做错了什么吗?
因为threads
使用相同的内存,所以你可以像在普通程序中一样使用全局变量
import random
from threading import Thread
epochs = 2
success = [0] * epochs
def do():
global temp
if random.random() > 0.5:
print(' add')
temp += 1
print("Teh current temp is {0}".format(temp))
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
t1 = Thread(target=do)
t2 = Thread(target=do)
t1.start()
t2.start()
t1.join()
t2.join()
success[epoch] = temp
您最终可以在访问共享变量时使用 Lock()
来阻塞其他线程。最后你可以发送 lock
作为参数。但是因为Python不能运行同时两个线程所以不知道是不是真的需要
import random
from threading import Thread
from threading import Lock
epochs = 2
success = [0] * epochs
def do(lock):
global temp
if random.random() > 0.5:
print(' add')
lock.acquire() # block other threads
temp += 1
lock.release() # unblock other threads
print("Teh current temp is {0}".format(temp))
lock = Lock()
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
t1 = Thread(target=do, args=(lock,))
t2 = Thread(target=do, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
success[epoch] = temp
如果你只需要添加一些值而不显示那么我宁愿使用 queue
将 0
或 1
发送到主线程并将其添加到 temp
在主线程中。
import random
from threading import Thread
from queue import Queue
epochs = 2
success = [0] * epochs
def do(q):
if random.random() > 0.5:
q.put(1)
else:
q.put(0)
q = Queue()
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
temp = 0
t1 = Thread(target=do, args=(q,))
t2 = Thread(target=do, args=(q,))
t1.start()
t2.start()
temp += q.get()
temp += q.get()
t1.join()
t2.join()
success[epoch] = temp
print(temp)
并且此方法也适用于 multiprocessking
、ray
、joblib
等
编辑:
最新版本ray
。我在 epoch
import random
import ray
ray.init()
@ray.remote
def do():
if random.random() > 0.5:
print('do: 1')
return 1
else:
print('do: 0')
return 0
epochs = 5
success = [0] * epochs
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
results = ray.get([do.remote() for _ in range(5)])
temp = sum(results)
success[epoch] = temp
print('Temp:', temp)
最新版本 joblib
import random
from joblib import Parallel, delayed
def do():
if random.random() > 0.5:
print('do: 1')
return 1
else:
print('do: 0')
return 0
epochs = 5
success = [0] * epochs
pool = Parallel(n_jobs=3)
for epoch in range(epochs):
print("iteration: {0}".format(epoch))
#results = Parallel(n_jobs=3)(delayed(do)() for _ in range(5))
results = pool(delayed(do)() for _ in range(5))
temp = sum(results)
success[epoch] = temp
print('Temp:', temp)