使用真实脚本实现 Airflow

Implementing Airflow with real scripts

我成功设置了 Airflow 服务器。我想 运行 一些测试工作,但我找不到适合我正在尝试做的事情的初学者指南。

当前状态:

我的大部分脚本的逻辑都是基于如果文件还没有被处理,然后处理它。 'Processed' 文件要么按文件名组织在数据库 table 中,要么我将文件移动到经过特殊处理的文件夹中。

我的另一个逻辑是基于文件名中的日期。我将存在的文件日期与应该存在的日期(日期范围)进行比较。如果文件不存在,我会创建它(通常是 BCP 或 PSQL 查询)。

我只有 Airflow 运行 这些 .py 文件吗?或者我应该更改我的脚本以使用一些 Airflow parameters/jinja 模板吗?

我几乎觉得我几乎可以将 BashOperator 用于所有事情。这行得通吗

dag_input = sys.argv[1]

def alter_table(query, engine=pg_engine):
    fake_conn = engine.raw_connection()
    fake_cur = fake_conn.cursor()
    fake_cur.execute(query)
    fake_conn.commit()
    fake_cur.close()

query_list = [
              f'SELECT * from table_1 where report_date = \'{dag_input}\'',
              f'SELECT * from table_2 where report_date = \'{dag_input}\'',
]

for value in query_list:
    alter_table(value)

那么 dag 应该是这样的,气流参数用于 sys.argv?

templated_command = """
        python download_raw.py "{{ ds }}"
"""

t3 = BashOperator(
    task_id='download_raw',
    bash_command=templated_command,
    dag=dag)

由于此任务的代码在 python 中,我将使用 PythonOperator。

在 download_raw.py 中添加一个以 **kwargs 作为参数的方法,您就可以访问上下文中的所有内容。

from download_raw import my_func

t3 = PythonOperator(
    task_id='download_raw',
    python_callable=my_func,
    dag=dag)


#inside download_raw.py

def my_func(**kwargs):
    context = kwargs
    ds = context['ds']
    ... (do your logic here)

我会这样做,否则你的 bash 命令可能会在上下文的多个部分变得可怕。