如何确保 Celery 任务在 Pytest 中排队?
How to make sure a Celery task was enqueued in Pytest?
我有一个 API 端点来注册新用户。 "welcome email" 将排队并异步执行此任务。我有 2 个单元测试要检查:
- Api 确实将用户信息保存到数据库 OK
- Celery 任务确实会发送包含正确内容+模板的电子邮件
我想添加第三个单元测试以确保"The endpoint has to enqueue email-sending after saving user form to DB"
我尝试使用 celery.AsyncResult,但它要求我 运行 工作人员。此外,即使工作人员准备就绪,我们仍然无法验证任务是否已入队,因为模棱两可的 PENDING 状态:
- 队列中存在任务但尚未执行:PENDING
- 队列中不存在任务:待定
有人遇到过这个问题吗?我该如何解决?
在测试环境中解决这个问题的常用方法是使用 task_always_eager 配置设置,它基本上指示 Celery 运行 像常规函数一样的任务。 Celery 将创建一个行为相同但执行逻辑完全不同的 EagerResult 类型的对象,而不是 AsyncResult。
我有一个 API 端点来注册新用户。 "welcome email" 将排队并异步执行此任务。我有 2 个单元测试要检查:
- Api 确实将用户信息保存到数据库 OK
- Celery 任务确实会发送包含正确内容+模板的电子邮件
我想添加第三个单元测试以确保"The endpoint has to enqueue email-sending after saving user form to DB"
我尝试使用 celery.AsyncResult,但它要求我 运行 工作人员。此外,即使工作人员准备就绪,我们仍然无法验证任务是否已入队,因为模棱两可的 PENDING 状态:
- 队列中存在任务但尚未执行:PENDING
- 队列中不存在任务:待定
有人遇到过这个问题吗?我该如何解决?
在测试环境中解决这个问题的常用方法是使用 task_always_eager 配置设置,它基本上指示 Celery 运行 像常规函数一样的任务。 Celery 将创建一个行为相同但执行逻辑完全不同的 EagerResult 类型的对象,而不是 AsyncResult。