如何将相同的队列放入不同的多处理文件中?
How do I get the same queue into differnet multiprocessing files?
我看到很多关于如何使用队列的教程,但它们总是显示它们在同一个文件中实现。我试图从一开始就组织好我的代码文件,因为我预计项目会变得非常大。如何让我在主文件中初始化的队列导入到其他函数文件中?
这是我的主要文件:
import multiprocessing
import queue
from data_handler import data_handler
from get_info import get_memory_info
from get_info import get_cpu_info
if __name__ == '__main__':
q = queue.Queue()
getDataHandlerProcess = multiprocessing.Process(target=data_handler(q))
getMemoryInfoProcess = multiprocessing.Process(target=get_memory_info(q))
getCPUInfoProcess = multiprocessing.Process(target=get_cpu_info(q))
getDataHandlerProcess.start()
getMemoryInfoProcess.start()
getCPUInfoProcess.start()
print("DEBUG: All tasks successfully started.")
这是我的制作人:
import psutil
import struct
import time
from data_frame import build_frame
def get_cpu_info(q):
while True:
cpu_string_data = bytes('', 'utf-8')
cpu_times = psutil.cpu_percent(interval=0.0, percpu=True)
for item in cpu_times:
cpu_string_data = cpu_string_data + struct.pack('<d',item)
cpu_frame = build_frame(cpu_string_data, 0, 0, -1, -1)
q.put(cpu_frame)
print(cpu_frame)
time.sleep(1.000)
def get_memory_info(q):
while True:
memory_string_data = bytes('', 'utf-8')
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_info = list(virtual_memory+swap_memory)
for item in memory_info:
memory_string_data = memory_string_data + struct.pack('<d',item)
memory_frame = build_frame(memory_string_data, 0, 1, -1, -1)
q.put(memory_frame)
print(memory_frame)
time.sleep(1.000)
def get_disk_info(q):
while True:
disk_usage = psutil.disk_usage("/")
disk_io_counters = psutil.disk_io_counters()
time.sleep(1.000)
print(disk_usage)
print(disk_io_counters)
def get_network_info(q):
while True:
net_io_counters = psutil.net_io_counters()
time.sleep(1.000)
print(net_io_counters)
这是我的消费者:
def data_handler(q):
while True:
next_element = q.get()
print(next_element)
print('Item received at data handler queue.')
我不是很清楚“如何让我在主文件中初始化的队列导入到其他函数文件中?”是什么意思。
通常,您将队列作为参数传递给函数,并在函数范围内使用它,而不管文件结构如何。或者执行用于任何其他数据类型的任何其他变量共享技术。
但是您的代码似乎有一些错误。首先,您不应该将 queue.Queue
与 multiprocessing
一起使用。它有自己的 class.
版本
q = multiprocessing.Queue()
它比 queue.Queue
慢,但它适用于跨进程共享数据。
其次,创建流程对象的正确方法是:
getDataHandlerProcess = multiprocessing.Process(target=data_handler, args = (q,))
否则您实际上是在调用 data_handler(q)
主线程并试图将其 return 值分配给 multiprocessing.Process
的 target
参数。您的 data_handler
函数永远不会 returns,因此在多处理甚至开始之前,程序可能会在此时陷入无限死锁。编辑:实际上它可能会进入无限等待,试图从一个永远不会被填充的空队列中获取一个元素。
我看到很多关于如何使用队列的教程,但它们总是显示它们在同一个文件中实现。我试图从一开始就组织好我的代码文件,因为我预计项目会变得非常大。如何让我在主文件中初始化的队列导入到其他函数文件中?
这是我的主要文件:
import multiprocessing
import queue
from data_handler import data_handler
from get_info import get_memory_info
from get_info import get_cpu_info
if __name__ == '__main__':
q = queue.Queue()
getDataHandlerProcess = multiprocessing.Process(target=data_handler(q))
getMemoryInfoProcess = multiprocessing.Process(target=get_memory_info(q))
getCPUInfoProcess = multiprocessing.Process(target=get_cpu_info(q))
getDataHandlerProcess.start()
getMemoryInfoProcess.start()
getCPUInfoProcess.start()
print("DEBUG: All tasks successfully started.")
这是我的制作人:
import psutil
import struct
import time
from data_frame import build_frame
def get_cpu_info(q):
while True:
cpu_string_data = bytes('', 'utf-8')
cpu_times = psutil.cpu_percent(interval=0.0, percpu=True)
for item in cpu_times:
cpu_string_data = cpu_string_data + struct.pack('<d',item)
cpu_frame = build_frame(cpu_string_data, 0, 0, -1, -1)
q.put(cpu_frame)
print(cpu_frame)
time.sleep(1.000)
def get_memory_info(q):
while True:
memory_string_data = bytes('', 'utf-8')
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_info = list(virtual_memory+swap_memory)
for item in memory_info:
memory_string_data = memory_string_data + struct.pack('<d',item)
memory_frame = build_frame(memory_string_data, 0, 1, -1, -1)
q.put(memory_frame)
print(memory_frame)
time.sleep(1.000)
def get_disk_info(q):
while True:
disk_usage = psutil.disk_usage("/")
disk_io_counters = psutil.disk_io_counters()
time.sleep(1.000)
print(disk_usage)
print(disk_io_counters)
def get_network_info(q):
while True:
net_io_counters = psutil.net_io_counters()
time.sleep(1.000)
print(net_io_counters)
这是我的消费者:
def data_handler(q):
while True:
next_element = q.get()
print(next_element)
print('Item received at data handler queue.')
我不是很清楚“如何让我在主文件中初始化的队列导入到其他函数文件中?”是什么意思。
通常,您将队列作为参数传递给函数,并在函数范围内使用它,而不管文件结构如何。或者执行用于任何其他数据类型的任何其他变量共享技术。
但是您的代码似乎有一些错误。首先,您不应该将 queue.Queue
与 multiprocessing
一起使用。它有自己的 class.
q = multiprocessing.Queue()
它比 queue.Queue
慢,但它适用于跨进程共享数据。
其次,创建流程对象的正确方法是:
getDataHandlerProcess = multiprocessing.Process(target=data_handler, args = (q,))
否则您实际上是在调用 data_handler(q)
主线程并试图将其 return 值分配给 multiprocessing.Process
的 target
参数。您的 data_handler
函数永远不会 returns,因此在多处理甚至开始之前,程序可能会在此时陷入无限死锁。编辑:实际上它可能会进入无限等待,试图从一个永远不会被填充的空队列中获取一个元素。