具有多线程的 Celery worker - 如何同时更新结果
Celery worker with multithreading - how to update results concurently
我用 Celery worker 创建了一个 Flask API。用户触发 "start tests" 按钮,该按钮发出 POST 请求 returns url 用户可以使用该按钮每 5 秒获取一次测试结果(需要更新 fontend 进度条)。 Celery 任务包括线程。我的目标是根据线程的结果同时更新 Celery 任务状态。我不想等到我的所有线程都完成 return 结果。我的 Celery 任务如下所示:
@celery.task(bind=True) # bind argument instructs Celery to send a "self" argument and use it to record status updates
def run_tests(self, dialog_cases):
"""
Testing running as a background task
"""
results = []
test_case_no = 1
test_controller = TestController(dialog_cases)
bot_config = [test_controller.url, test_controller.headers, test_controller.db_name]
threads = []
queue = Queue()
start = time.perf_counter()
threads_list = list()
for test_case in test_controller.test_cases:
t = Thread(target=queue.put({randint(0,1000): TestCase(test_case, bot_config)}))
t.start()
threads_list.append(t)
for t in threads_list:
t.join()
results_dict_list = [queue.get() for _ in range(len(test_controller.test_cases))]
for result in results_dict_list:
for key, value in result.items():
cprint.info(f"{key}, {value.test_failed}")
现在:TestCase 是一个对象,它在创建时运行一个函数,该函数进行几次迭代,然后 returns 测试是否失败或通过。我有另一个 Flask 端点,它 return 是任务的状态。问题是如何同时获得线程returned 的值,而不必等到它们全部完成?我尝试了 Queue,但是当一切都结束时,这只能得到 return 个结果。
您可以简单地使用 update_state to modify state of the task, from each of those threads if that is what you want. Furthermore, you can create your own, custom states。由于您想在每个测试完成时就知道结果,因此为教学测试设置一个自定义状态似乎是个好主意,您可以在运行时从每个线程更新该状态。
一种替代方法是重构您的代码,以便每个测试实际上都是一个 Celery 任务。然后使用 Chord 或 Group 原语来构建您的工作流程。由于您想知道运行时的状态,那么也许 Group 更好,因为这样您就可以监视 GroupResult 对象的状态...
我用 Celery worker 创建了一个 Flask API。用户触发 "start tests" 按钮,该按钮发出 POST 请求 returns url 用户可以使用该按钮每 5 秒获取一次测试结果(需要更新 fontend 进度条)。 Celery 任务包括线程。我的目标是根据线程的结果同时更新 Celery 任务状态。我不想等到我的所有线程都完成 return 结果。我的 Celery 任务如下所示:
@celery.task(bind=True) # bind argument instructs Celery to send a "self" argument and use it to record status updates
def run_tests(self, dialog_cases):
"""
Testing running as a background task
"""
results = []
test_case_no = 1
test_controller = TestController(dialog_cases)
bot_config = [test_controller.url, test_controller.headers, test_controller.db_name]
threads = []
queue = Queue()
start = time.perf_counter()
threads_list = list()
for test_case in test_controller.test_cases:
t = Thread(target=queue.put({randint(0,1000): TestCase(test_case, bot_config)}))
t.start()
threads_list.append(t)
for t in threads_list:
t.join()
results_dict_list = [queue.get() for _ in range(len(test_controller.test_cases))]
for result in results_dict_list:
for key, value in result.items():
cprint.info(f"{key}, {value.test_failed}")
现在:TestCase 是一个对象,它在创建时运行一个函数,该函数进行几次迭代,然后 returns 测试是否失败或通过。我有另一个 Flask 端点,它 return 是任务的状态。问题是如何同时获得线程returned 的值,而不必等到它们全部完成?我尝试了 Queue,但是当一切都结束时,这只能得到 return 个结果。
您可以简单地使用 update_state to modify state of the task, from each of those threads if that is what you want. Furthermore, you can create your own, custom states。由于您想在每个测试完成时就知道结果,因此为教学测试设置一个自定义状态似乎是个好主意,您可以在运行时从每个线程更新该状态。
一种替代方法是重构您的代码,以便每个测试实际上都是一个 Celery 任务。然后使用 Chord 或 Group 原语来构建您的工作流程。由于您想知道运行时的状态,那么也许 Group 更好,因为这样您就可以监视 GroupResult 对象的状态...