恢复芹菜 job.apply_async.join()
Resume Celery job.apply_async.join()
TL;DR: 如果我的生产者进程在向消费者发送一些工作后崩溃,它如何在重启后恢复等待消费者完成他们的工作?
producer.py
将工作项分派给一组消费者(已注册的 Celery 任务),如下所示:
from celery import group, signature
job = group(
signature(task_name, args=(x,)) for x in xrange(100)
)
group_result = job.apply_async()
group_result.join() # blocks until tasks complete
消费者需要很长时间才能完成,因此 possible/expected 生产者有时会在调用 join()
期间死亡。当生产者死亡时,它会重新启动。
produce重启后,有没有办法恢复join?
我是 Celery 新手;已经梳理了文档和示例,但还没有找到答案。希望高手能指点迷津
郑重声明,我的解决方案是存储任务 ID(在某些外部 file/db/whatever)。
job = group(
signature(task_name, args=(x,)) for x in xrange(100)
)
group_result = job.apply_async()
# store group_result.results somewhere durable
persist(some_db, group_result.results)
group_result.join() # blocks until tasks complete
现在,如果上面的join
失败了,我们可以这样恢复:
# read tasks from persistent storage
previous_pending_results = read_previously_persisted_results(some_db)
for result in previous_pending_results:
if result.status != 'SUCCESS':
...
TL;DR: 如果我的生产者进程在向消费者发送一些工作后崩溃,它如何在重启后恢复等待消费者完成他们的工作?
producer.py
将工作项分派给一组消费者(已注册的 Celery 任务),如下所示:
from celery import group, signature
job = group(
signature(task_name, args=(x,)) for x in xrange(100)
)
group_result = job.apply_async()
group_result.join() # blocks until tasks complete
消费者需要很长时间才能完成,因此 possible/expected 生产者有时会在调用 join()
期间死亡。当生产者死亡时,它会重新启动。
produce重启后,有没有办法恢复join?
我是 Celery 新手;已经梳理了文档和示例,但还没有找到答案。希望高手能指点迷津
郑重声明,我的解决方案是存储任务 ID(在某些外部 file/db/whatever)。
job = group(
signature(task_name, args=(x,)) for x in xrange(100)
)
group_result = job.apply_async()
# store group_result.results somewhere durable
persist(some_db, group_result.results)
group_result.join() # blocks until tasks complete
现在,如果上面的join
失败了,我们可以这样恢复:
# read tasks from persistent storage
previous_pending_results = read_previously_persisted_results(some_db)
for result in previous_pending_results:
if result.status != 'SUCCESS':
...