你如何在 Django 中测试一个在自身内部委托任务的方法?

How do you test a method in django that delegates tasks within itself?

假设我有 2 个 celery 任务:

@app.task(name='process_one_line')
def process_one_line(line):
    do_alot(line)

@app.task(name='process_one_file')
def process_one_file(file_id):
    for line in get_file_by_id(file_id):
        process_one_line.delay(line)

并说我有一个如下所示的测试用例:

def test_processing_a_file(self):
    process_one_file(self.file_id)

这 运行 很好,但是委托的 process_one_line 方法实际上被推到测试用例之外的我的其他芹菜实例。简而言之,我的 redis 服务器 运行ning 在端口 6379 上接收任务,然后将它们推送给 "main" 工作人员。因此,无论 do_alot 进行什么更改,它们都会反映在实际数据库中,而不是在您 运行 django 测试时创建的测试数据库中。

我可以通过让它自己测试 process_one_line 来测试东西,但如果我能一起测试整个东西就更好了。

您可以将 CELERY_ALWAYS_EAGER=True 添加到您的测试设置中,以防止所有任务都发送到 redis。