你如何在 Django 中测试一个在自身内部委托任务的方法?
How do you test a method in django that delegates tasks within itself?
假设我有 2 个 celery 任务:
@app.task(name='process_one_line')
def process_one_line(line):
do_alot(line)
@app.task(name='process_one_file')
def process_one_file(file_id):
for line in get_file_by_id(file_id):
process_one_line.delay(line)
并说我有一个如下所示的测试用例:
def test_processing_a_file(self):
process_one_file(self.file_id)
这 运行 很好,但是委托的 process_one_line 方法实际上被推到测试用例之外的我的其他芹菜实例。简而言之,我的 redis 服务器 运行ning 在端口 6379 上接收任务,然后将它们推送给 "main" 工作人员。因此,无论 do_alot 进行什么更改,它们都会反映在实际数据库中,而不是在您 运行 django 测试时创建的测试数据库中。
我可以通过让它自己测试 process_one_line 来测试东西,但如果我能一起测试整个东西就更好了。
您可以将 CELERY_ALWAYS_EAGER=True
添加到您的测试设置中,以防止所有任务都发送到 redis。
假设我有 2 个 celery 任务:
@app.task(name='process_one_line')
def process_one_line(line):
do_alot(line)
@app.task(name='process_one_file')
def process_one_file(file_id):
for line in get_file_by_id(file_id):
process_one_line.delay(line)
并说我有一个如下所示的测试用例:
def test_processing_a_file(self):
process_one_file(self.file_id)
这 运行 很好,但是委托的 process_one_line 方法实际上被推到测试用例之外的我的其他芹菜实例。简而言之,我的 redis 服务器 运行ning 在端口 6379 上接收任务,然后将它们推送给 "main" 工作人员。因此,无论 do_alot 进行什么更改,它们都会反映在实际数据库中,而不是在您 运行 django 测试时创建的测试数据库中。
我可以通过让它自己测试 process_one_line 来测试东西,但如果我能一起测试整个东西就更好了。
您可以将 CELERY_ALWAYS_EAGER=True
添加到您的测试设置中,以防止所有任务都发送到 redis。