Celery:异步检索先前任务的部分结果
Celery: Asynchronously Retrieve Partial Results from Previous Task
我想链接我的 ETL 工作流,从而加载任务可以异步流式传输来自 ExtractTransform 任务的部分结果,而不必等待 ExtractTransform 完成。芹菜可以吗?
我正在考虑的两种方法:
方法一
创建一个 ETLTask,其中 LoadTask(以某种方式)不断从 ETLTask 获取部分结果并将其出列(实质上将生产者和消费者分开)。我无法从 AsyncResult 判断是否可能。听起来我只想走独立的生产者和消费者的道路,我不确定在 Celery 中该怎么做。
class ExtractTransformTask(Task):
def long_running_extract_transform(self):
pass
def run(self):
return self.long_running_extract_transform()
class LoadTask(Task):
def long_running_load(self):
pass
def run(self, results):
self.long_running_load(results)
class ETLTask(Task):
def run(self):
et_result = ExtractTransformTask.delay()
# while et_result PENDING or SUCCESS
# dequeue current results and load with LoadTask instance
方法二
分块提取源数据并创建多个加载任务。
使用方法 2 的解决方案。
class ExtractTransformMixin(object):
def long_running_extract_transform(self, chunkify=False):
pass
class LoadTask(Task):
def long_running_load(self):
pass
def run(self, results):
self.long_running_load(results)
class ETLTask(ExtractTransformMixin, Task):
def run(self):
load_results = ResultSet([])
for chunk in long_running_extract_transform(chunkify=True):
load_results.add(LoadTask().delay(chunk))
return load_results
我想链接我的 ETL 工作流,从而加载任务可以异步流式传输来自 ExtractTransform 任务的部分结果,而不必等待 ExtractTransform 完成。芹菜可以吗?
我正在考虑的两种方法:
方法一
创建一个 ETLTask,其中 LoadTask(以某种方式)不断从 ETLTask 获取部分结果并将其出列(实质上将生产者和消费者分开)。我无法从 AsyncResult 判断是否可能。听起来我只想走独立的生产者和消费者的道路,我不确定在 Celery 中该怎么做。
class ExtractTransformTask(Task):
def long_running_extract_transform(self):
pass
def run(self):
return self.long_running_extract_transform()
class LoadTask(Task):
def long_running_load(self):
pass
def run(self, results):
self.long_running_load(results)
class ETLTask(Task):
def run(self):
et_result = ExtractTransformTask.delay()
# while et_result PENDING or SUCCESS
# dequeue current results and load with LoadTask instance
方法二
分块提取源数据并创建多个加载任务。
使用方法 2 的解决方案。
class ExtractTransformMixin(object):
def long_running_extract_transform(self, chunkify=False):
pass
class LoadTask(Task):
def long_running_load(self):
pass
def run(self, results):
self.long_running_load(results)
class ETLTask(ExtractTransformMixin, Task):
def run(self):
load_results = ResultSet([])
for chunk in long_running_extract_transform(chunkify=True):
load_results.add(LoadTask().delay(chunk))
return load_results