实时共享变量。多进程 Python

Share variables in real time. Multiprocess Python

我有 2 个进程:进程 A 生成数据,进程 B 对进程 A 的值执行数学计算。该进程通过 Python 中的多进程库进行同步,如下面的代码所示。到目前为止,我共享变量的方式是通过提供 Multiprocess 库的数据类型 valueArray。如果我了解这些数据类型是如何工作的,则必须等待进程完成使用共享变量,因此,通过这种方式我明白我不能立即使用进程 A 生成的新值(或数组)。

生成数据的进程A为:

def process_A(processid, textid, texttitle, values):
    
    iters = 10
    
    h1 = new_data_incoming()
    data  = h1.text.split(" ")

    p_var1, p_var2 = 0, 0
    i = 0
    while i < iters:
        
        a_var1 = float(data[1].replace(',',''))
        a_var2 = float(data[1].replace(',',''))
        if a_var1 != p_var1:
            values[0] = a_var1
            values[1] = a_var2
            values[2] = float(data[3].replace(',',''))
            values[3] = float(data[4].replace(',',''))
            
            if data[5][0] == "+":
                values[4] = float(data[5][1::])
            else:
                values[4] = -float(data[5][1::])
                
            if data[6][0] == "+":
                values[5] = float(data[6][1:-1])
            else:
                values[5] = -float(data[6][1:-1])
            
            p_var1 = a_var1
            p_var2 = a_var2
            i = i + 1

控制进程A启动并接收数据的进程B是:

def process_B():
    ids = getids()
    
    key2 = "stocastic_process"
    
    id2  = ids[key2]
    
    values = Array('d', range(6))
    
    process_A_launched = Process(target=process_A, args=(2, id2, key2, values2))
    
    process_A_launched.start()
    print("Process with id %i has started: stocastic %s" % (2, key2))
    
    process_A_launched.join(timeout=0)
    
    finished = False
    fnsh = False
    
    while finished == False:
            
        if not process_A_launched.is_alive() and fnsh == False:
            print("process %i has finished!" % (2))
            fnsh = True
            
        if fnsh == True:
            finished = True
    
    print("All process have finished!")

    do_somethig_with_values(values)

所以,我想尽快使用新生成的数据。总是进程A产生了一个新数据,在进程B中使用它,因为我的意图是进程A一直是运行。也许我需要使用管道?

在 C 中,我在 POSIX 中使用了管道和中断,但我在 Python 中不知道这些。

谢谢!

使用阻塞互斥锁 (Lock()) 处理线程安全问题,或者使用队列,具体取决于您必须发送的数据量。

使用互斥锁

from multiprocessing import Lock
from time import sleep
from collections import dequeue
data = dequeue()
lock = Lock()

def producer():
    with lock:
        data.append(stuff)
    ...
    with lock:
        data.append(stuff)

def launcher_consumer():
    start_producer()
    while not len(data):
        sleep(0.001)
    with lock:
        do_stuff(data.pop_left())

但是到目前为止...我们几乎已经重新实现了一个队列。如果我要发送一位(或两位)数据,我只会使用互斥体。 (在第一种情况下我不会使用 dequeue,只是一个普通的 var。)

使用队列

from multiprocessing import Queue

data = Queue()

def producer():
    data.put(stuff)
    ...
    data.put(stuff)

def launcher_consumer():
    start_producer()
    val = data.get(timeout=None) # set timeout if you need it
    do_stuff(val)
    val = data.get()
    do_stuff(val)

有关任一解决方案的更多信息,请参阅多处理文档。

参考资料

https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes