在倭黑猩猩中使用有条件服务的最佳方式是什么?

What is the best way to use conditional services in Bonobo?

我想使用 Bonobo 将数据从一个 Postgres 数据库移动到不同服务上的另一个数据库。我配置了连接,想在提取期间使用一个,在加载期间使用一个。

这是我的测试设置:

source_connection_config_env = 'DEV'
source_connection_config = get_config(source_connection_config_env)

target_connection_config_env = 'TRAINING'
target_connection_config = get_target_connection_config(target_connection_config_env)

...

def get_services(**options):
    if connection == 'source':
        return {
            'sqlalchemy.engine': create_postgresql_engine(**{
                    'host': source_connection_config.source_postres_connection['HOST'],
                    'name': source_connection_config.source_postres_connection['DATABASE'],
                    'user': source_connection_config.source_postres_connection['USER'],
                    'pass': source_connection_config.source_postres_connection['PASSWORD']
                })
        }

    if connetion == 'target':
        return {
            'sqlalchemy.engine': create_postgresql_engine(**{
                    'host': target_connection_config.target_postres_connection['HOST'],
                    'name': target_connection_config.target_postres_connection['DATABASE'],
                    'user': target_connection_config.target_postres_connection['USER'],
                    'pass': target_connection_config.target_postres_connection['PASSWORD']
                })
        }

我不确定更改连接的最佳位置在哪里,或者实际如何操作。

提前致谢!

据我了解,您想在同一个图表中同时使用源连接和目标连接(希望我做对了)。

所以你不能有这个条件,因为它 return 只有一个。

相反,我会 return 两者,命名不同:

def get_services(**options):
    return {
        'engine.source': create_postgresql_engine(**{...}),
        'engine.target': create_postgresql_engine(**{...}),
    }

然后在转换中使用不同的连接:

graph.add_chain(
    Select(..., engine='engine.source'),
    ...,
    InsertOrUpdate(..., engine='engine.target'),
)

请注意,服务名称只是字符串,没有强制约定或命名模式。 'sqlalchemy.engine' 名称只是默认名称,但只要您使用实际使用的名称配置转换,您就不必同意它。