Python 服务器:如何在确保一次只处理一个任务的同时连续对任务进行排队
Python server: How can I continously queue tasks while making sure only one at a time is processed
我有两个任务,一个每两秒调用一次,另一个随机调用。两者都需要访问一个在上一个调用完成之前无法调用的对象(如果发生这种情况我需要手动重启硬件设备)。
该对象来自 class,它允许通过套接字与硬件设备通信。
为此,我创建了一个线程 class,以便 运行 一切都在后台进行,没有其他任务被阻止。在这个 class 中,我实现了一个队列:两个不同的函数将任务放入队列中,一个工作人员应该执行任务!不是!同时
由于整个项目都是一个服务器,因此它应该 运行 连续。
好吧,这是我的代码,它显然不起作用。如果有人知道如何解决这个问题,我会很高兴。
更新:2020 年 10 月 26 日
为了使我的问题更清楚,我根据 Artiom Kozyrev 的回答更新了代码。
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg , ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(2)
TimeStamp("f1 finished")
def f2():
TimeStamp("f2 start")
time.sleep(6)
TimeStamp("f2 finished")
def insertf1():
for i in range(10):
q.put(f1())
time.sleep(2)
def insertf2():
for i in range(10):
time.sleep(10)
q.put(f2())
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
输出为:
f1 开始:0 s 0 ms 0 us
f1 完成:2 秒 2 毫秒 515 us
f1 开始:4 秒 9 毫秒 335 微秒
f1 完成:6 秒 9 毫秒 932 us
f1 开始:8 秒 17 毫秒 428 us
f2 开始:10 秒 12 毫秒 794 us
f1 完成:10 秒 28 毫秒 633 us
f1 开始:12 秒 29 毫秒 182 us
f1 完成:14 秒 34 毫秒 411 us
f2 完成:16 秒 19 毫秒 330 us
f1 在f2 完成之前开始,这是需要避免的。
为此,您需要结合使用 Queue
和 Lock
。锁定将阻止 worker-threads 同时工作。查找下面的代码示例:
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
if __name__ == '__main__':
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
# produce tasks
for i in range(10):
q.put(i)
# stop tasks with "poison pillow"
for i in range(len(workers)):
q.put(None)
根据问题的补充进行编辑(已添加锁定)
主要思想是你不应该 运行 f1 和 f2 没有锁。
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, f):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {f()}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg, ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(1)
TimeStamp("f1 finished")
return f"Func-1-{threading.current_thread().getName()}"
def f2():
TimeStamp("f2 start")
time.sleep(3)
TimeStamp("f2 finished")
return f"Func-2-{threading.current_thread().getName()}"
def insertf1():
for i in range(5):
q.put(f1) # do not run f1 here! Run it in worker thread with Lock
def insertf2():
for i in range(5):
q.put(f2) # do not run f2 here! Run it in worker thread with Lock
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
我有两个任务,一个每两秒调用一次,另一个随机调用。两者都需要访问一个在上一个调用完成之前无法调用的对象(如果发生这种情况我需要手动重启硬件设备)。
该对象来自 class,它允许通过套接字与硬件设备通信。
为此,我创建了一个线程 class,以便 运行 一切都在后台进行,没有其他任务被阻止。在这个 class 中,我实现了一个队列:两个不同的函数将任务放入队列中,一个工作人员应该执行任务!不是!同时
由于整个项目都是一个服务器,因此它应该 运行 连续。
好吧,这是我的代码,它显然不起作用。如果有人知道如何解决这个问题,我会很高兴。
更新:2020 年 10 月 26 日 为了使我的问题更清楚,我根据 Artiom Kozyrev 的回答更新了代码。
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg , ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(2)
TimeStamp("f1 finished")
def f2():
TimeStamp("f2 start")
time.sleep(6)
TimeStamp("f2 finished")
def insertf1():
for i in range(10):
q.put(f1())
time.sleep(2)
def insertf2():
for i in range(10):
time.sleep(10)
q.put(f2())
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
输出为:
f1 开始:0 s 0 ms 0 us
f1 完成:2 秒 2 毫秒 515 us
f1 开始:4 秒 9 毫秒 335 微秒
f1 完成:6 秒 9 毫秒 932 us
f1 开始:8 秒 17 毫秒 428 us
f2 开始:10 秒 12 毫秒 794 us
f1 完成:10 秒 28 毫秒 633 us
f1 开始:12 秒 29 毫秒 182 us
f1 完成:14 秒 34 毫秒 411 us
f2 完成:16 秒 19 毫秒 330 us
f1 在f2 完成之前开始,这是需要避免的。
为此,您需要结合使用 Queue
和 Lock
。锁定将阻止 worker-threads 同时工作。查找下面的代码示例:
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
if __name__ == '__main__':
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
# produce tasks
for i in range(10):
q.put(i)
# stop tasks with "poison pillow"
for i in range(len(workers)):
q.put(None)
根据问题的补充进行编辑(已添加锁定)
主要思想是你不应该 运行 f1 和 f2 没有锁。
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, f):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {f()}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg, ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(1)
TimeStamp("f1 finished")
return f"Func-1-{threading.current_thread().getName()}"
def f2():
TimeStamp("f2 start")
time.sleep(3)
TimeStamp("f2 finished")
return f"Func-2-{threading.current_thread().getName()}"
def insertf1():
for i in range(5):
q.put(f1) # do not run f1 here! Run it in worker thread with Lock
def insertf2():
for i in range(5):
q.put(f2) # do not run f2 here! Run it in worker thread with Lock
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()