实时共享变量。多进程 Python
Share variables in real time. Multiprocess Python
我有 2 个进程:进程 A
生成数据,进程 B
对进程 A
的值执行数学计算。该进程通过 Python 中的多进程库进行同步,如下面的代码所示。到目前为止,我共享变量的方式是通过提供 Multiprocess
库的数据类型 value
或 Array
。如果我了解这些数据类型是如何工作的,则必须等待进程完成使用共享变量,因此,通过这种方式我明白我不能立即使用进程 A 生成的新值(或数组)。
生成数据的进程A为:
def process_A(processid, textid, texttitle, values):
iters = 10
h1 = new_data_incoming()
data = h1.text.split(" ")
p_var1, p_var2 = 0, 0
i = 0
while i < iters:
a_var1 = float(data[1].replace(',',''))
a_var2 = float(data[1].replace(',',''))
if a_var1 != p_var1:
values[0] = a_var1
values[1] = a_var2
values[2] = float(data[3].replace(',',''))
values[3] = float(data[4].replace(',',''))
if data[5][0] == "+":
values[4] = float(data[5][1::])
else:
values[4] = -float(data[5][1::])
if data[6][0] == "+":
values[5] = float(data[6][1:-1])
else:
values[5] = -float(data[6][1:-1])
p_var1 = a_var1
p_var2 = a_var2
i = i + 1
控制进程A启动并接收数据的进程B是:
def process_B():
ids = getids()
key2 = "stocastic_process"
id2 = ids[key2]
values = Array('d', range(6))
process_A_launched = Process(target=process_A, args=(2, id2, key2, values2))
process_A_launched.start()
print("Process with id %i has started: stocastic %s" % (2, key2))
process_A_launched.join(timeout=0)
finished = False
fnsh = False
while finished == False:
if not process_A_launched.is_alive() and fnsh == False:
print("process %i has finished!" % (2))
fnsh = True
if fnsh == True:
finished = True
print("All process have finished!")
do_somethig_with_values(values)
所以,我想尽快使用新生成的数据。总是进程A产生了一个新数据,在进程B中使用它,因为我的意图是进程A一直是运行。也许我需要使用管道?
在 C 中,我在 POSIX 中使用了管道和中断,但我在 Python 中不知道这些。
谢谢!
使用阻塞互斥锁 (Lock()
) 处理线程安全问题,或者使用队列,具体取决于您必须发送的数据量。
使用互斥锁
from multiprocessing import Lock
from time import sleep
from collections import dequeue
data = dequeue()
lock = Lock()
def producer():
with lock:
data.append(stuff)
...
with lock:
data.append(stuff)
def launcher_consumer():
start_producer()
while not len(data):
sleep(0.001)
with lock:
do_stuff(data.pop_left())
但是到目前为止...我们几乎已经重新实现了一个队列。如果我要发送一位(或两位)数据,我只会使用互斥体。 (在第一种情况下我不会使用 dequeue,只是一个普通的 var。)
使用队列
from multiprocessing import Queue
data = Queue()
def producer():
data.put(stuff)
...
data.put(stuff)
def launcher_consumer():
start_producer()
val = data.get(timeout=None) # set timeout if you need it
do_stuff(val)
val = data.get()
do_stuff(val)
有关任一解决方案的更多信息,请参阅多处理文档。
参考资料
https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes
https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes
我有 2 个进程:进程 A
生成数据,进程 B
对进程 A
的值执行数学计算。该进程通过 Python 中的多进程库进行同步,如下面的代码所示。到目前为止,我共享变量的方式是通过提供 Multiprocess
库的数据类型 value
或 Array
。如果我了解这些数据类型是如何工作的,则必须等待进程完成使用共享变量,因此,通过这种方式我明白我不能立即使用进程 A 生成的新值(或数组)。
生成数据的进程A为:
def process_A(processid, textid, texttitle, values):
iters = 10
h1 = new_data_incoming()
data = h1.text.split(" ")
p_var1, p_var2 = 0, 0
i = 0
while i < iters:
a_var1 = float(data[1].replace(',',''))
a_var2 = float(data[1].replace(',',''))
if a_var1 != p_var1:
values[0] = a_var1
values[1] = a_var2
values[2] = float(data[3].replace(',',''))
values[3] = float(data[4].replace(',',''))
if data[5][0] == "+":
values[4] = float(data[5][1::])
else:
values[4] = -float(data[5][1::])
if data[6][0] == "+":
values[5] = float(data[6][1:-1])
else:
values[5] = -float(data[6][1:-1])
p_var1 = a_var1
p_var2 = a_var2
i = i + 1
控制进程A启动并接收数据的进程B是:
def process_B():
ids = getids()
key2 = "stocastic_process"
id2 = ids[key2]
values = Array('d', range(6))
process_A_launched = Process(target=process_A, args=(2, id2, key2, values2))
process_A_launched.start()
print("Process with id %i has started: stocastic %s" % (2, key2))
process_A_launched.join(timeout=0)
finished = False
fnsh = False
while finished == False:
if not process_A_launched.is_alive() and fnsh == False:
print("process %i has finished!" % (2))
fnsh = True
if fnsh == True:
finished = True
print("All process have finished!")
do_somethig_with_values(values)
所以,我想尽快使用新生成的数据。总是进程A产生了一个新数据,在进程B中使用它,因为我的意图是进程A一直是运行。也许我需要使用管道?
在 C 中,我在 POSIX 中使用了管道和中断,但我在 Python 中不知道这些。
谢谢!
使用阻塞互斥锁 (Lock()
) 处理线程安全问题,或者使用队列,具体取决于您必须发送的数据量。
使用互斥锁
from multiprocessing import Lock
from time import sleep
from collections import dequeue
data = dequeue()
lock = Lock()
def producer():
with lock:
data.append(stuff)
...
with lock:
data.append(stuff)
def launcher_consumer():
start_producer()
while not len(data):
sleep(0.001)
with lock:
do_stuff(data.pop_left())
但是到目前为止...我们几乎已经重新实现了一个队列。如果我要发送一位(或两位)数据,我只会使用互斥体。 (在第一种情况下我不会使用 dequeue,只是一个普通的 var。)
使用队列
from multiprocessing import Queue
data = Queue()
def producer():
data.put(stuff)
...
data.put(stuff)
def launcher_consumer():
start_producer()
val = data.get(timeout=None) # set timeout if you need it
do_stuff(val)
val = data.get()
do_stuff(val)
有关任一解决方案的更多信息,请参阅多处理文档。
参考资料
https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes