具有多线程的 Celery worker - 如何同时更新结果

Celery worker with multithreading - how to update results concurently

我用 Celery worker 创建了一个 Flask API。用户触发 "start tests" 按钮,该按钮发出 POST 请求 returns url 用户可以使用该按钮每 5 秒获取一次测试结果(需要更新 fontend 进度条)。 Celery 任务包括线程。我的目标是根据线程的结果同时更新 Celery 任务状态。我不想等到我的所有线程都完成 return 结果。我的 Celery 任务如下所示:

@celery.task(bind=True)  # bind argument instructs Celery to send a "self" argument and use it to record status updates
def run_tests(self, dialog_cases):
    """
    Testing running as a background task
    """
    results = []
    test_case_no = 1
    test_controller = TestController(dialog_cases)
    bot_config = [test_controller.url, test_controller.headers, test_controller.db_name]
    threads = []
    queue = Queue()
    start = time.perf_counter()
    threads_list = list()
    for test_case in test_controller.test_cases:
        t = Thread(target=queue.put({randint(0,1000): TestCase(test_case, bot_config)}))
        t.start()
        threads_list.append(t)

    for t in threads_list:
        t.join()
    results_dict_list = [queue.get() for _ in range(len(test_controller.test_cases))]
    for result in results_dict_list:
        for key, value in result.items():
            cprint.info(f"{key}, {value.test_failed}")

现在:TestCase 是一个对象,它在创建时运行一个函数,该函数进行几次迭代,然后 returns 测试是否失败或通过。我有另一个 Flask 端点,它 return 是任务的状态。问题是如何同时获得线程returned 的值,而不必等到它们全部完成?我尝试了 Queue,但是当一切都结束时,这只能得到 return 个结果。

您可以简单地使用 update_state to modify state of the task, from each of those threads if that is what you want. Furthermore, you can create your own, custom states。由于您想在每个测试完成时就知道结果,因此为教学测试设置一个自定义状态似乎是个好主意,您可以在运行时从每个线程更新该状态。

一种替代方法是重构您的代码,以便每个测试实际上都是一个 Celery 任务。然后使用 Chord 或 Group 原语来构建您的工作流程。由于您想知道运行时的状态,那么也许 Group 更好,因为这样您就可以监视 GroupResult 对象的状态...