在 Airflow 的 python_callable 中执行 SSH 连接?
Perform SSH Connection inside Airflow's python_callable?
在 Airflow 1.10.10 DAG 中,我们有一个 ShortCircuitOperator
使用 python 函数 check_remote_server()
来决定分支。
在 check_remote_server_data()
函数中,我们如何启动与远程服务器的 SSH 连接,运行 在其上执行 bash 命令并获取结果?
是否可以使用我之前使用 Web UI 定义的 Airflow SSH 连接?
def check_remote_server_data():
pass # how can we use a predefined Airflow SSH connection, named `remote`?
with dag:
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
dag=dag
)
我只能使用 SSHOperator
来完成,但我需要使用结果来确定短路条件:
SSHOperator(
task_id='sshop',
ssh_conn_id='remote',
command='date +%F',
dag=dag)
您可以在 ShortCircuitOperator
中访问的 SSHOperator
supports an argument, do_xcom_push
to pass the output of the command as an XCom value:
def check_remote_server_data(**context):
xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")
with dag:
ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
provide_context=True,
dag=dag
)
ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]
在 Airflow 1.10.10 DAG 中,我们有一个 ShortCircuitOperator
使用 python 函数 check_remote_server()
来决定分支。
在 check_remote_server_data()
函数中,我们如何启动与远程服务器的 SSH 连接,运行 在其上执行 bash 命令并获取结果?
是否可以使用我之前使用 Web UI 定义的 Airflow SSH 连接?
def check_remote_server_data():
pass # how can we use a predefined Airflow SSH connection, named `remote`?
with dag:
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
dag=dag
)
我只能使用 SSHOperator
来完成,但我需要使用结果来确定短路条件:
SSHOperator(
task_id='sshop',
ssh_conn_id='remote',
command='date +%F',
dag=dag)
您可以在 ShortCircuitOperator
中访问的 SSHOperator
supports an argument, do_xcom_push
to pass the output of the command as an XCom value:
def check_remote_server_data(**context):
xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")
with dag:
ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
provide_context=True,
dag=dag
)
ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]