使用真实脚本实现 Airflow
Implementing Airflow with real scripts
我成功设置了 Airflow 服务器。我想 运行 一些测试工作,但我找不到适合我正在尝试做的事情的初学者指南。
当前状态:
- Python 脚本从 SFTP 下载文件(本地计算机上不存在的任何文件)或从查询创建文件
- Pandas 脚本将数据读入内存,以某种方式对其进行修改以为数据库做好准备(寻找新维度、重新映射、添加计算)。将数据加载到数据库中适当的 table。发送电子邮件摘要 (pandas to_html)
我的大部分脚本的逻辑都是基于如果文件还没有被处理,然后处理它。 'Processed' 文件要么按文件名组织在数据库 table 中,要么我将文件移动到经过特殊处理的文件夹中。
我的另一个逻辑是基于文件名中的日期。我将存在的文件日期与应该存在的日期(日期范围)进行比较。如果文件不存在,我会创建它(通常是 BCP 或 PSQL 查询)。
我只有 Airflow 运行 这些 .py 文件吗?或者我应该更改我的脚本以使用一些 Airflow parameters/jinja 模板吗?
我几乎觉得我几乎可以将 BashOperator 用于所有事情。这行得通吗
dag_input = sys.argv[1]
def alter_table(query, engine=pg_engine):
fake_conn = engine.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.execute(query)
fake_conn.commit()
fake_cur.close()
query_list = [
f'SELECT * from table_1 where report_date = \'{dag_input}\'',
f'SELECT * from table_2 where report_date = \'{dag_input}\'',
]
for value in query_list:
alter_table(value)
那么 dag 应该是这样的,气流参数用于 sys.argv?
templated_command = """
python download_raw.py "{{ ds }}"
"""
t3 = BashOperator(
task_id='download_raw',
bash_command=templated_command,
dag=dag)
由于此任务的代码在 python 中,我将使用 PythonOperator。
在 download_raw.py 中添加一个以 **kwargs 作为参数的方法,您就可以访问上下文中的所有内容。
from download_raw import my_func
t3 = PythonOperator(
task_id='download_raw',
python_callable=my_func,
dag=dag)
#inside download_raw.py
def my_func(**kwargs):
context = kwargs
ds = context['ds']
... (do your logic here)
我会这样做,否则你的 bash 命令可能会在上下文的多个部分变得可怕。
我成功设置了 Airflow 服务器。我想 运行 一些测试工作,但我找不到适合我正在尝试做的事情的初学者指南。
当前状态:
- Python 脚本从 SFTP 下载文件(本地计算机上不存在的任何文件)或从查询创建文件
- Pandas 脚本将数据读入内存,以某种方式对其进行修改以为数据库做好准备(寻找新维度、重新映射、添加计算)。将数据加载到数据库中适当的 table。发送电子邮件摘要 (pandas to_html)
我的大部分脚本的逻辑都是基于如果文件还没有被处理,然后处理它。 'Processed' 文件要么按文件名组织在数据库 table 中,要么我将文件移动到经过特殊处理的文件夹中。
我的另一个逻辑是基于文件名中的日期。我将存在的文件日期与应该存在的日期(日期范围)进行比较。如果文件不存在,我会创建它(通常是 BCP 或 PSQL 查询)。
我只有 Airflow 运行 这些 .py 文件吗?或者我应该更改我的脚本以使用一些 Airflow parameters/jinja 模板吗?
我几乎觉得我几乎可以将 BashOperator 用于所有事情。这行得通吗
dag_input = sys.argv[1]
def alter_table(query, engine=pg_engine):
fake_conn = engine.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.execute(query)
fake_conn.commit()
fake_cur.close()
query_list = [
f'SELECT * from table_1 where report_date = \'{dag_input}\'',
f'SELECT * from table_2 where report_date = \'{dag_input}\'',
]
for value in query_list:
alter_table(value)
那么 dag 应该是这样的,气流参数用于 sys.argv?
templated_command = """
python download_raw.py "{{ ds }}"
"""
t3 = BashOperator(
task_id='download_raw',
bash_command=templated_command,
dag=dag)
由于此任务的代码在 python 中,我将使用 PythonOperator。
在 download_raw.py 中添加一个以 **kwargs 作为参数的方法,您就可以访问上下文中的所有内容。
from download_raw import my_func
t3 = PythonOperator(
task_id='download_raw',
python_callable=my_func,
dag=dag)
#inside download_raw.py
def my_func(**kwargs):
context = kwargs
ds = context['ds']
... (do your logic here)
我会这样做,否则你的 bash 命令可能会在上下文的多个部分变得可怕。