Celery:异步检索先前任务的部分结果

Celery: Asynchronously Retrieve Partial Results from Previous Task

我想链接我的 ETL 工作流,从而加载任务可以异步流式传输来自 ExtractTransform 任务的部分结果,而不必等待 ExtractTransform 完成。芹菜可以吗?

我正在考虑的两种方法:

方法一

创建一个 ETLTask,其中 LoadTask(以某种方式)不断从 ETLTask 获取部分结果并将其出列(实质上将生产者和消费者分开)。我无法从 AsyncResult 判断是否可能。听起来我只想走独立的生产者和消费者的道路,我不确定在 Celery 中该怎么做。

class ExtractTransformTask(Task):

    def long_running_extract_transform(self):
        pass

    def run(self):
        return self.long_running_extract_transform()

class LoadTask(Task):

    def long_running_load(self):
        pass

    def run(self, results):
        self.long_running_load(results)

class ETLTask(Task):

    def run(self):
        et_result = ExtractTransformTask.delay()
        # while et_result PENDING or SUCCESS
        # dequeue current results and load with LoadTask instance

方法二

分块提取源数据并创建多个加载任务。

使用方法 2 的解决方案

class ExtractTransformMixin(object):

    def long_running_extract_transform(self, chunkify=False):
        pass

class LoadTask(Task):

    def long_running_load(self):
        pass

    def run(self, results):
        self.long_running_load(results)

class ETLTask(ExtractTransformMixin, Task):

    def run(self):
        load_results = ResultSet([])
        for chunk in long_running_extract_transform(chunkify=True):
            load_results.add(LoadTask().delay(chunk))
        return load_results