运行 检查多处理队列进行处理的无限进程
Run an infinite process that checks multiprocessing queues for processing
程序分为三个部分:向队列添加数据、处理队列中的数据和检查队列是否为空。除了 Add() 之外,所有部分都有自己的过程。该程序应该像这样运行,当我们启动它时,
它应该继续检查队列,如果有的话 运行 函数,如果没有保持 运行ning。
同时,可以将异步数据添加到队列中,它应该运行如前所述的工作
一次只能从队列中处理一件事。
我正在使用 windows,而且我一直在使用
TypeError: can't pickle _thread.lock objects
这是代码
from multiprocessing import Queue,Process
from time import sleep
import threading
from multiprocessing.pool import ThreadPool
from multiprocessing import dummy as multithreading
import concurrent.futures
# import queue
class A(object):
def __init__(self, *args, **kwargs):
self.Q = Queue()
#Add data to queue: should be accessable all time
def Add(self, i):
# q = Queue()
self.Q.put(threading.Thread(target=self.printAns(i)))
#Processes the data: runs only upon call
def printAns(self,name):
print("Name to print is: ",name)
return 'completed'
#This function call printANS as a process
def jobRun(self):
# job = self.Q.get()
# ans = Queue()
jobThread = self.Q.get()
async_result = jRPool.apply_async(jobThread)
print(async_result.get())
#Checks if the queue has anything: checker functions needs to run constantly
def checkQueue(self):
while True:
if self.Q.empty():
pass
else:
return True
#should initiate call to checker upon success calls jobRun() as a process and go back to checking
def run(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
checkfunc = executor.map(self.checkQueue)
while True:
if checkfunc:
sleep(1)
executor.map(self.jobRun)
self.Q.close()
if __name__ == '__main__':
a = A()
a.Add("test1")
a.Add("test2")
a.run()
# a.Add("this")
while True:
data = input("Enter a string: ")
a.Add(data)
非常感谢任何帮助。我的预感与锁或信号量有关。
如果我正确理解了您的要求,这样的内容可能对您有用。
compute
是远程进程中调用的函数。现在它只打印处理事物的工作人员的 PID 并打印出字符串的每个字母。这样您就可以看到进程并行工作。
tasks
是 poll_results
线程应跟踪的任务对象列表。
poll_results
是在主进程中运行的 threading.Thread
目标;它 busy-loops 处理 tasks
中的任务,一旦它们准备好就打印结果值。
import os
import time
import threading
from multiprocessing import Pool
def compute(value):
for ch in value:
print(os.getpid(), ch)
time.sleep(0.05)
return (value, len(value))
tasks = []
def poll_results():
while True:
for task in tasks[:]:
if task.ready():
print("Task finished:", task.get())
tasks.remove(task)
def main():
poller_thread = threading.Thread(target=poll_results)
poller_thread.start()
with Pool() as p:
t1 = p.apply_async(compute, ("hello",))
t2 = p.apply_async(compute, ("world",))
# Wait for the first results before entering the loop
t1.wait()
t2.wait()
while True:
data = input("Enter a string: ")
tasks.append(p.apply_async(compute, (data,)))
if __name__ == "__main__":
main()
这里的输出类似于
65755 h
65756 w
65755 e
65756 o
65755 l
65756 r
65755 l
65756 l
65755 o
65756 d
Enter a string: Hello, world!
Enter a string: 65757 H
65757 e
65757 l
65757 l
65757 o
65757 ,
65757
65757 w
65757 o
65757 r
65757 l
65757 d
65757 !
Task finished: ('Hello, world!', 13)
注意 IO 是如何交错的(您甚至在处理任务之前就会得到第二个 "Enter string" 提示,并且两个工作人员会尽可能打印出 hello 和 world 的字母)。
程序分为三个部分:向队列添加数据、处理队列中的数据和检查队列是否为空。除了 Add() 之外,所有部分都有自己的过程。该程序应该像这样运行,当我们启动它时,
我正在使用 windows,而且我一直在使用
TypeError: can't pickle _thread.lock objects
这是代码
from multiprocessing import Queue,Process
from time import sleep
import threading
from multiprocessing.pool import ThreadPool
from multiprocessing import dummy as multithreading
import concurrent.futures
# import queue
class A(object):
def __init__(self, *args, **kwargs):
self.Q = Queue()
#Add data to queue: should be accessable all time
def Add(self, i):
# q = Queue()
self.Q.put(threading.Thread(target=self.printAns(i)))
#Processes the data: runs only upon call
def printAns(self,name):
print("Name to print is: ",name)
return 'completed'
#This function call printANS as a process
def jobRun(self):
# job = self.Q.get()
# ans = Queue()
jobThread = self.Q.get()
async_result = jRPool.apply_async(jobThread)
print(async_result.get())
#Checks if the queue has anything: checker functions needs to run constantly
def checkQueue(self):
while True:
if self.Q.empty():
pass
else:
return True
#should initiate call to checker upon success calls jobRun() as a process and go back to checking
def run(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
checkfunc = executor.map(self.checkQueue)
while True:
if checkfunc:
sleep(1)
executor.map(self.jobRun)
self.Q.close()
if __name__ == '__main__':
a = A()
a.Add("test1")
a.Add("test2")
a.run()
# a.Add("this")
while True:
data = input("Enter a string: ")
a.Add(data)
非常感谢任何帮助。我的预感与锁或信号量有关。
如果我正确理解了您的要求,这样的内容可能对您有用。
compute
是远程进程中调用的函数。现在它只打印处理事物的工作人员的 PID 并打印出字符串的每个字母。这样您就可以看到进程并行工作。tasks
是poll_results
线程应跟踪的任务对象列表。poll_results
是在主进程中运行的threading.Thread
目标;它 busy-loops 处理tasks
中的任务,一旦它们准备好就打印结果值。
import os
import time
import threading
from multiprocessing import Pool
def compute(value):
for ch in value:
print(os.getpid(), ch)
time.sleep(0.05)
return (value, len(value))
tasks = []
def poll_results():
while True:
for task in tasks[:]:
if task.ready():
print("Task finished:", task.get())
tasks.remove(task)
def main():
poller_thread = threading.Thread(target=poll_results)
poller_thread.start()
with Pool() as p:
t1 = p.apply_async(compute, ("hello",))
t2 = p.apply_async(compute, ("world",))
# Wait for the first results before entering the loop
t1.wait()
t2.wait()
while True:
data = input("Enter a string: ")
tasks.append(p.apply_async(compute, (data,)))
if __name__ == "__main__":
main()
这里的输出类似于
65755 h
65756 w
65755 e
65756 o
65755 l
65756 r
65755 l
65756 l
65755 o
65756 d
Enter a string: Hello, world!
Enter a string: 65757 H
65757 e
65757 l
65757 l
65757 o
65757 ,
65757
65757 w
65757 o
65757 r
65757 l
65757 d
65757 !
Task finished: ('Hello, world!', 13)
注意 IO 是如何交错的(您甚至在处理任务之前就会得到第二个 "Enter string" 提示,并且两个工作人员会尽可能打印出 hello 和 world 的字母)。