具有 SSH 连接的 Airflow DAG 无法按计划同时启动 运行
Airflow DAGs with SSH connection can't start running in the same time by schedule
我使用 Ubuntu 在本地计算机 运行ning Win 10 上安装了 Airflow 2.0。我使用 PostgreSQL 作为数据库,CeleryExecutor 和 RabbitMQ 作为 Celery 后端。我创建了一些 DAG,每个 DAG 通过 SSH 隧道连接到 Redshift 数据库并执行 SQL 命令。当我手动触发或通过调度程序 运行 触发时,每个 DAG 运行s 顺利。
但是,当我为这些 DAG 设置同时开始 运行ning 的时间表时遇到错误。例如,如果 DAG1 和 DAG2 在 8:00 AM 运行ning 开始,这两个 dag 将失败并显示以下错误:
psycopg2.OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
如果我将这 2 个日期设置在不同的时间开始,一切 运行 都会很顺利。另外,如果我将 2 个 dags 组合成 1 个 dag 和 2 个任务,这个组合的 dag 运行s 很好。
这是我的 DAG 代码,每个 dag 都一样(只是 SQL 命令不同):
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time
dag = DAG('test', description='Simple test tutorial DAG',
schedule_interval= None,
start_date=datetime(2021, 1, 6), tags = ['test'])
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='dw', # NOTE: host='localhost'
schema='test'
)
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('''
insert into abc values (1, 'a')
''')
cursor.close()
conn.commit()
conn.close()
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)
找到解决方法了,回来更新。希望它能有用。
Tunnel 有一个超时间隔(我不知道具体的默认值,但很确定它小于 1 秒),所以我们需要将它设置得更大。创建隧道后再添加 1 行代码:
sshtunnel.SSH_TIMEOUT = sshtunnel.TUNNEL_TIMEOUT = 5.0
我使用 Ubuntu 在本地计算机 运行ning Win 10 上安装了 Airflow 2.0。我使用 PostgreSQL 作为数据库,CeleryExecutor 和 RabbitMQ 作为 Celery 后端。我创建了一些 DAG,每个 DAG 通过 SSH 隧道连接到 Redshift 数据库并执行 SQL 命令。当我手动触发或通过调度程序 运行 触发时,每个 DAG 运行s 顺利。
但是,当我为这些 DAG 设置同时开始 运行ning 的时间表时遇到错误。例如,如果 DAG1 和 DAG2 在 8:00 AM 运行ning 开始,这两个 dag 将失败并显示以下错误:
psycopg2.OperationalError: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request.
如果我将这 2 个日期设置在不同的时间开始,一切 运行 都会很顺利。另外,如果我将 2 个 dags 组合成 1 个 dag 和 2 个任务,这个组合的 dag 运行s 很好。
这是我的 DAG 代码,每个 dag 都一样(只是 SQL 命令不同):
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time
dag = DAG('test', description='Simple test tutorial DAG',
schedule_interval= None,
start_date=datetime(2021, 1, 6), tags = ['test'])
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='dw', # NOTE: host='localhost'
schema='test'
)
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('''
insert into abc values (1, 'a')
''')
cursor.close()
conn.commit()
conn.close()
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)
找到解决方法了,回来更新。希望它能有用。
Tunnel 有一个超时间隔(我不知道具体的默认值,但很确定它小于 1 秒),所以我们需要将它设置得更大。创建隧道后再添加 1 行代码:
sshtunnel.SSH_TIMEOUT = sshtunnel.TUNNEL_TIMEOUT = 5.0