在 Airflow 的 python_callable 中执行 SSH 连接?

Perform SSH Connection inside Airflow's python_callable?

在 Airflow 1.10.10 DAG 中,我们有一个 ShortCircuitOperator 使用 python 函数 check_remote_server() 来决定分支。

check_remote_server_data() 函数中,我们如何启动与远程服务器的 SSH 连接,运行 在其上执行 bash 命令并获取结果?

是否可以使用我之前使用 Web UI 定义的 Airflow SSH 连接?

def check_remote_server_data():
    pass   # how can we use a predefined Airflow SSH connection, named `remote`?

with dag:
    shortcircuitop = ShortCircuitOperator(
        task_id='shortcircuitop',
        python_callable=check_remote_server_data,
        dag=dag
    )

我只能使用 SSHOperator 来完成,但我需要使用结果来确定短路条件:

SSHOperator(
    task_id='sshop',
    ssh_conn_id='remote',
    command='date +%F',
dag=dag)

您可以在 ShortCircuitOperator 中访问的 SSHOperator supports an argument, do_xcom_push to pass the output of the command as an XCom value

def check_remote_server_data(**context):
    xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")

with dag:
    ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)

    shortcircuitop = ShortCircuitOperator(
        task_id='shortcircuitop',
        python_callable=check_remote_server_data,
        provide_context=True,
        dag=dag
    )

    ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]