使用任务创建 celery 管道,运行 只有当上一个任务中的一些项目被堆叠时
Creating celery pipeline with tasks, that run only when some number of items in previous task are stacked
我正在尝试为我的应用程序创建某种管道。我有一个问题 - 应用程序的主要目标是读取视频,拍摄每 N 次视频,并将其放入管道中。管道内部有 5 个不同的任务,例如:
1. Crop image
2. Store image in the array. if array length = IMAGES_NEEDED_FOR_TASK3, launch task 3
3. Apply some transforms to image, make one big image from IMAGES_NEEDED_FOR_TASK3,.
4. Stack transformed images in the array. if array length = IMAGES_NEEDED_FOR_TASK5, launch task 5
5. Write info about income images from task 4 to database
我很难实现任务 2 和 4,因为它们有条件。如果他们没有条件,我就用链式方法。我考虑过从任务 2 调用任务 3(我想为每个任务创建一个不同的队列),但我读到这被认为是不好的做法。提前谢谢你
很难用 Celery 工作流来表达这一点(如果这是你正在努力解决的问题),所以我建议你编写任务 4,这样当堆栈有 IMAGES_NEEDED_FOR_TASK5 图像(只需调用 app.send_task() 或 app.apply_async())。
任务 #2 也是如此。只要你不需要处理任务 3 和任务 5 的结果就可以了,那样的话逻辑会变得更复杂。如果 Celery 有一个原语可以更容易地表达这样的情况,那就太好了。
如果你仍然坚持使用 Celery 工作流,那么你或许应该重新考虑逻辑,并利用 Chunk primitive。在这种情况下,您的任务 #2 将 add.chunks()
。任务 #5 也是如此。
我正在尝试为我的应用程序创建某种管道。我有一个问题 - 应用程序的主要目标是读取视频,拍摄每 N 次视频,并将其放入管道中。管道内部有 5 个不同的任务,例如:
1. Crop image
2. Store image in the array. if array length = IMAGES_NEEDED_FOR_TASK3, launch task 3
3. Apply some transforms to image, make one big image from IMAGES_NEEDED_FOR_TASK3,.
4. Stack transformed images in the array. if array length = IMAGES_NEEDED_FOR_TASK5, launch task 5
5. Write info about income images from task 4 to database
我很难实现任务 2 和 4,因为它们有条件。如果他们没有条件,我就用链式方法。我考虑过从任务 2 调用任务 3(我想为每个任务创建一个不同的队列),但我读到这被认为是不好的做法。提前谢谢你
很难用 Celery 工作流来表达这一点(如果这是你正在努力解决的问题),所以我建议你编写任务 4,这样当堆栈有 IMAGES_NEEDED_FOR_TASK5 图像(只需调用 app.send_task() 或 app.apply_async())。
任务 #2 也是如此。只要你不需要处理任务 3 和任务 5 的结果就可以了,那样的话逻辑会变得更复杂。如果 Celery 有一个原语可以更容易地表达这样的情况,那就太好了。
如果你仍然坚持使用 Celery 工作流,那么你或许应该重新考虑逻辑,并利用 Chunk primitive。在这种情况下,您的任务 #2 将 add.chunks()
。任务 #5 也是如此。