运行 来自另一个使用 celery 的周期性任务的任务
Run task from another periodic task with celery
我有周期性任务,应该触发另一个任务。最终预期行为:第一个任务应该从外部服务收集一些数据,然后遍历这些数据(列表)并调用另一个任务并传递参数(循环中的当前迭代)。我想让循环中的那些任务是异步的。
我写了一段代码 运行 是一个任务,但我不知道这个任务应该如何调用另一个任务,因为当我用 .delay()
方法执行它时,什么也没有发生。
这是我想要的一些简化代码运行:
@celery_app.task(name="Hello World")
def hello_world():
print(f"HELLO WORLD PRINT")
add.delay(2, 2)
return 'Hello'
@celery_app.task
def add(x, y):
with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file:
file.write(str(x+y))
print(f"x + y = {x + y}")
return x + y
目前 hello_world()
每 30 秒 运行ning 一次,因此我在日志中收到 HELLO WORLD PRINT,但添加任务不是 运行ning。我看不到该任务应创建的打印件或文件。
评论更新,这是我使用队列的方式:
celery_app.conf.task_routes = {
"project.app.hello_world": {
"queue": 'test_queue'
},
"project.app.add": {
"queue": 'test_queue'
},
解决问题的方法很少。
显而易见的是将队列名称放在.apply_async中,例如add.apply_async(10, 10, queue="test_queue")
.
另一种方案是将队列放入任务注解中,即@celery_app.task(queue="test_queue")
.
我从未配置过 task_routes,但我相信可以像您尝试的那样在那里指定它...
我有周期性任务,应该触发另一个任务。最终预期行为:第一个任务应该从外部服务收集一些数据,然后遍历这些数据(列表)并调用另一个任务并传递参数(循环中的当前迭代)。我想让循环中的那些任务是异步的。
我写了一段代码 运行 是一个任务,但我不知道这个任务应该如何调用另一个任务,因为当我用 .delay()
方法执行它时,什么也没有发生。
这是我想要的一些简化代码运行:
@celery_app.task(name="Hello World")
def hello_world():
print(f"HELLO WORLD PRINT")
add.delay(2, 2)
return 'Hello'
@celery_app.task
def add(x, y):
with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file:
file.write(str(x+y))
print(f"x + y = {x + y}")
return x + y
目前 hello_world()
每 30 秒 运行ning 一次,因此我在日志中收到 HELLO WORLD PRINT,但添加任务不是 运行ning。我看不到该任务应创建的打印件或文件。
评论更新,这是我使用队列的方式:
celery_app.conf.task_routes = {
"project.app.hello_world": {
"queue": 'test_queue'
},
"project.app.add": {
"queue": 'test_queue'
},
解决问题的方法很少。
显而易见的是将队列名称放在.apply_async中,例如add.apply_async(10, 10, queue="test_queue")
.
另一种方案是将队列放入任务注解中,即@celery_app.task(queue="test_queue")
.
我从未配置过 task_routes,但我相信可以像您尝试的那样在那里指定它...