您如何将 Dagster 管道参数化为 运行 具有多个不同 configurations/assets 的相同实体?
How would you parameterize Dagster pipelines to run same solids with multiple different configurations/assets?
假设我创建了一个包含以下实体的 Dagster 管道:
- 从文件执行 SQL 查询并获取结果
- 将结果写入 table
我想同时为 10 个不同的 table 执行此操作。每个 table 需要不同的 SQL 查询。
什么是最好的方法?
一种方法是使用实体工厂。 run_query_solid_factory
和 write_results_solid_factory
是接受输入(例如名称和查询或 table)的实体工厂,return 是可以在管道中 运行 的实体。 summary_report
在打印摘要信息之前等待所有上游实体完成。
def run_query_solid_factory(name, query):
@solid(name=name)
def _run_query(context):
context.log.info(query)
return 'result'
return _run_query
def write_results_solid_factory(name, table):
@solid(name=name)
def _write_results(context, query_result):
context.log.info(table)
context.log.info(query_result)
return 'success'
return _write_results
@solid
def summary_report(context, statuses):
context.log.info(' '.join(statuses))
@pipeline
def pipeline():
solid_output_handles = []
queries = [('table', 'query'), ('table2', 'query2')]
for table, query in queries:
get_data = run_query_solid_factory('run_query_{}'.format(query), query)
write_results = write_results_solid_factory('write_results_to_table_{}'.format(table), table)
solid_output_handles.append(write_results(get_data()))
summary_report(solid_output_handles)
上一个答案:
我建议创建一个 composite_solid that consists of a solid that handles (1) and a solid that handles (2). Then, you can alias the composite_solid once for each of the 10 tables, which will let you pass in the SQL query via config (see tutorial)
假设我创建了一个包含以下实体的 Dagster 管道:
- 从文件执行 SQL 查询并获取结果
- 将结果写入 table
我想同时为 10 个不同的 table 执行此操作。每个 table 需要不同的 SQL 查询。 什么是最好的方法?
一种方法是使用实体工厂。 run_query_solid_factory
和 write_results_solid_factory
是接受输入(例如名称和查询或 table)的实体工厂,return 是可以在管道中 运行 的实体。 summary_report
在打印摘要信息之前等待所有上游实体完成。
def run_query_solid_factory(name, query):
@solid(name=name)
def _run_query(context):
context.log.info(query)
return 'result'
return _run_query
def write_results_solid_factory(name, table):
@solid(name=name)
def _write_results(context, query_result):
context.log.info(table)
context.log.info(query_result)
return 'success'
return _write_results
@solid
def summary_report(context, statuses):
context.log.info(' '.join(statuses))
@pipeline
def pipeline():
solid_output_handles = []
queries = [('table', 'query'), ('table2', 'query2')]
for table, query in queries:
get_data = run_query_solid_factory('run_query_{}'.format(query), query)
write_results = write_results_solid_factory('write_results_to_table_{}'.format(table), table)
solid_output_handles.append(write_results(get_data()))
summary_report(solid_output_handles)
上一个答案:
我建议创建一个 composite_solid that consists of a solid that handles (1) and a solid that handles (2). Then, you can alias the composite_solid once for each of the 10 tables, which will let you pass in the SQL query via config (see tutorial)