关于在 Python 中的一些线程之间共享变量

On the sharing a variable between some threads in Python

作为创建更复杂代码的 MWE 的尝试,我有一个函数 do(),它应该由两个线程 运行。共享变量 temp 应该由两个线程使用队列模块带来的来操作。这是代码:

import random
from threading import Thread
from queue import Queue

epochs = 2

success = [0 for x in range(epochs)]

def do(temp,q):
    temp = q.get()
    if random.random() > 0.5:
        temp += 1
    print("Teh current temp is {0}".format(temp))
    return q.put(temp)

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    temp = 0
    q = Queue()
    Thread(target=do, args=(temp,q)).start()
    Thread(target=do, args=(temp,q)).start()
    success[epoch] = temp

然而输出只是

iteration: 0
iteration: 1

显然,即使 do()print 推荐也没有返回。

有人可以解决我在这里做错了什么吗?

因为threads使用相同的内存,所以你可以像在普通程序中一样使用全局变量

import random
from threading import Thread

epochs = 2

success = [0] * epochs

def do():
    global temp
    
    if random.random() > 0.5:
        print('  add')
        temp += 1

    print("Teh current temp is {0}".format(temp))


for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    temp = 0
    
    t1 = Thread(target=do)
    t2 = Thread(target=do)

    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    success[epoch] = temp

您最终可以在访问共享变量时使用 Lock() 来阻塞其他线程。最后你可以发送 lock 作为参数。但是因为Python不能运行同时两个线程所以不知道是不是真的需要

import random
from threading import Thread
from threading import Lock

epochs = 2

success = [0] * epochs

def do(lock):
    global temp
    
    if random.random() > 0.5:
        print('  add')
        lock.acquire()  # block other threads
        temp += 1
        lock.release()  # unblock other threads
        
    print("Teh current temp is {0}".format(temp))

lock = Lock()

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    temp = 0
    
    t1 = Thread(target=do, args=(lock,))
    t2 = Thread(target=do, args=(lock,))

    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    success[epoch] = temp

如果你只需要添加一些值而不显示那么我宁愿使用 queue01 发送到主线程并将其添加到 temp 在主线程中。

import random
from threading import Thread
from queue import Queue

epochs = 2

success = [0] * epochs

def do(q):
    if random.random() > 0.5:
        q.put(1)
    else:
        q.put(0)

q = Queue()

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    temp = 0
    
    t1 = Thread(target=do, args=(q,))
    t2 = Thread(target=do, args=(q,))

    t1.start()
    t2.start()
    
    temp += q.get()
    temp += q.get()
    
    t1.join()
    t2.join()
    
    success[epoch] = temp
    
    print(temp)

并且此方法也适用于 multiprocesskingrayjoblib


编辑:

最新版本ray。我在 epoch

中使用更大的 epochs 和更多的进程
import random
import ray


ray.init()

@ray.remote
def do():
    if random.random() > 0.5:
        print('do: 1')
        return 1
    else:
        print('do: 0')
        return 0

epochs = 5

success = [0] * epochs

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    results = ray.get([do.remote() for _ in range(5)])
    
    temp = sum(results)
    
    success[epoch] = temp
    
    print('Temp:', temp)

最新版本 joblib

import random
from joblib import Parallel, delayed


def do():
    if random.random() > 0.5:
        print('do: 1')
        return 1
    else:
        print('do: 0')
        return 0

epochs = 5

success = [0] * epochs

pool = Parallel(n_jobs=3)

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    #results = Parallel(n_jobs=3)(delayed(do)() for _ in range(5))
    results = pool(delayed(do)() for _ in range(5))
    
    temp = sum(results)
    
    success[epoch] = temp
    
    print('Temp:', temp)