在倭黑猩猩中使用有条件服务的最佳方式是什么?
What is the best way to use conditional services in Bonobo?
我想使用 Bonobo 将数据从一个 Postgres 数据库移动到不同服务上的另一个数据库。我配置了连接,想在提取期间使用一个,在加载期间使用一个。
这是我的测试设置:
source_connection_config_env = 'DEV'
source_connection_config = get_config(source_connection_config_env)
target_connection_config_env = 'TRAINING'
target_connection_config = get_target_connection_config(target_connection_config_env)
...
def get_services(**options):
if connection == 'source':
return {
'sqlalchemy.engine': create_postgresql_engine(**{
'host': source_connection_config.source_postres_connection['HOST'],
'name': source_connection_config.source_postres_connection['DATABASE'],
'user': source_connection_config.source_postres_connection['USER'],
'pass': source_connection_config.source_postres_connection['PASSWORD']
})
}
if connetion == 'target':
return {
'sqlalchemy.engine': create_postgresql_engine(**{
'host': target_connection_config.target_postres_connection['HOST'],
'name': target_connection_config.target_postres_connection['DATABASE'],
'user': target_connection_config.target_postres_connection['USER'],
'pass': target_connection_config.target_postres_connection['PASSWORD']
})
}
我不确定更改连接的最佳位置在哪里,或者实际如何操作。
提前致谢!
据我了解,您想在同一个图表中同时使用源连接和目标连接(希望我做对了)。
所以你不能有这个条件,因为它 return 只有一个。
相反,我会 return 两者,命名不同:
def get_services(**options):
return {
'engine.source': create_postgresql_engine(**{...}),
'engine.target': create_postgresql_engine(**{...}),
}
然后在转换中使用不同的连接:
graph.add_chain(
Select(..., engine='engine.source'),
...,
InsertOrUpdate(..., engine='engine.target'),
)
请注意,服务名称只是字符串,没有强制约定或命名模式。 'sqlalchemy.engine' 名称只是默认名称,但只要您使用实际使用的名称配置转换,您就不必同意它。
我想使用 Bonobo 将数据从一个 Postgres 数据库移动到不同服务上的另一个数据库。我配置了连接,想在提取期间使用一个,在加载期间使用一个。
这是我的测试设置:
source_connection_config_env = 'DEV'
source_connection_config = get_config(source_connection_config_env)
target_connection_config_env = 'TRAINING'
target_connection_config = get_target_connection_config(target_connection_config_env)
...
def get_services(**options):
if connection == 'source':
return {
'sqlalchemy.engine': create_postgresql_engine(**{
'host': source_connection_config.source_postres_connection['HOST'],
'name': source_connection_config.source_postres_connection['DATABASE'],
'user': source_connection_config.source_postres_connection['USER'],
'pass': source_connection_config.source_postres_connection['PASSWORD']
})
}
if connetion == 'target':
return {
'sqlalchemy.engine': create_postgresql_engine(**{
'host': target_connection_config.target_postres_connection['HOST'],
'name': target_connection_config.target_postres_connection['DATABASE'],
'user': target_connection_config.target_postres_connection['USER'],
'pass': target_connection_config.target_postres_connection['PASSWORD']
})
}
我不确定更改连接的最佳位置在哪里,或者实际如何操作。
提前致谢!
据我了解,您想在同一个图表中同时使用源连接和目标连接(希望我做对了)。
所以你不能有这个条件,因为它 return 只有一个。
相反,我会 return 两者,命名不同:
def get_services(**options):
return {
'engine.source': create_postgresql_engine(**{...}),
'engine.target': create_postgresql_engine(**{...}),
}
然后在转换中使用不同的连接:
graph.add_chain(
Select(..., engine='engine.source'),
...,
InsertOrUpdate(..., engine='engine.target'),
)
请注意,服务名称只是字符串,没有强制约定或命名模式。 'sqlalchemy.engine' 名称只是默认名称,但只要您使用实际使用的名称配置转换,您就不必同意它。