如何在气流的 S3KeySensor 中动态添加 bucket_key 值
How to dynamically add bucket_key value in airflow's S3KeySensor
我正在尝试根据 dagrun 输入变量设置 S3KeySensor 的 bucket_key。
我有一个 dag "dag_trigger",它使用 TriggerDagRunOperator 为 dag "dag_triggered" 触发 dagrun。我正在尝试扩展示例 https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py。
所以我想发送一个变量给触发的 dag,根据变量的值我想在 S3KeySensor 任务中设置 backet_key 值。我知道如何在 PythonOperator 可调用函数中使用发送变量,但我不知道如何在传感器对象上使用它。
dag_trigger 日期:
import datetime
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime.now()}
dag = DAG('dag_trigger', default_args=default_args, schedule_interval="@hourly")
def task_1_run(context, dag_run_object):
sent_variable = '2018_02_19' # not important
dag_run_object.payload = {'message': sent_variable}
print "DAG dag_trigger triggered with payload: %s" % dag_run_object.payload)
return dag_run_object
task_1 = TriggerDagRunOperator(task_id="task_1",
trigger_dag_id="dag_triggered",
provide_context=True,
python_callable=task_1_run,
dag=dag)
和 dag_triggered dag:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import S3KeySensor
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime.now()
}
dag = DAG('dag_triggered', default_args=default_args, schedule_interval=None)
wait_files_to_arrive_task = S3KeySensor(
task_id='wait_file_to_arrive',
bucket_key='file_%s' % '', # Here I want to place conf['sent_variable']
wildcard_match=True,
bucket_name='test-bucket',
s3_conn_id='test_s3_conn',
timeout=18*60*60,
poke_interval=120,
dag=dag)
我尝试使用 dag.get_dagrun().conf['sent_variable'] 从 dag 对象中获取值,但我对如何设置 dagrun create_date 变量(dag_trigger 将每小时触发 dag_triggered 并且 dag_triggered 可以等待更长的时间来获取文件)。
我还尝试创建 PythonOperator,它将成为 wait_files_to_arrive_task 的上游。可调用 python 函数可以获得有关 sent_variable 的信息。之后,我尝试为 bucket_key 设置值,例如 bucket_key = callable_function() - 但我对参数有疑问。
而且我也觉得全局变量不是很好的解决方法
也许有人有可行的想法。
无法直接在 DAG 文件中获取 DAG 运行 conf 中的值。如果没有它所属的 DAG 运行 的上下文,这是无法确定的。一种思考方式是,当您 运行 python my_dag.py
测试您的 DAG 文件是否编译时,它必须初始化所有这些运算符,而无需指定执行日期。所以任何可能与 DAG 运行 不同的东西都不能直接引用。
因此,您可以将其作为模板值传递,稍后当任务实际执行时,该值将与上下文一起呈现 运行。
wait_files_to_arrive_task = S3KeySensor(
task_id='wait_file_to_arrive',
bucket_key='file_{{ dag_run.conf["message"] }}',
...)
请注意,只会呈现运算符 template_fields
中列出的参数。幸运的是有人预料到了这一点,所以 bucket_key 确实是一个模板字段。
我正在尝试根据 dagrun 输入变量设置 S3KeySensor 的 bucket_key。 我有一个 dag "dag_trigger",它使用 TriggerDagRunOperator 为 dag "dag_triggered" 触发 dagrun。我正在尝试扩展示例 https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py。
所以我想发送一个变量给触发的 dag,根据变量的值我想在 S3KeySensor 任务中设置 backet_key 值。我知道如何在 PythonOperator 可调用函数中使用发送变量,但我不知道如何在传感器对象上使用它。
dag_trigger 日期:
import datetime
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime.now()}
dag = DAG('dag_trigger', default_args=default_args, schedule_interval="@hourly")
def task_1_run(context, dag_run_object):
sent_variable = '2018_02_19' # not important
dag_run_object.payload = {'message': sent_variable}
print "DAG dag_trigger triggered with payload: %s" % dag_run_object.payload)
return dag_run_object
task_1 = TriggerDagRunOperator(task_id="task_1",
trigger_dag_id="dag_triggered",
provide_context=True,
python_callable=task_1_run,
dag=dag)
和 dag_triggered dag:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import S3KeySensor
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime.now()
}
dag = DAG('dag_triggered', default_args=default_args, schedule_interval=None)
wait_files_to_arrive_task = S3KeySensor(
task_id='wait_file_to_arrive',
bucket_key='file_%s' % '', # Here I want to place conf['sent_variable']
wildcard_match=True,
bucket_name='test-bucket',
s3_conn_id='test_s3_conn',
timeout=18*60*60,
poke_interval=120,
dag=dag)
我尝试使用 dag.get_dagrun().conf['sent_variable'] 从 dag 对象中获取值,但我对如何设置 dagrun create_date 变量(dag_trigger 将每小时触发 dag_triggered 并且 dag_triggered 可以等待更长的时间来获取文件)。
我还尝试创建 PythonOperator,它将成为 wait_files_to_arrive_task 的上游。可调用 python 函数可以获得有关 sent_variable 的信息。之后,我尝试为 bucket_key 设置值,例如 bucket_key = callable_function() - 但我对参数有疑问。
而且我也觉得全局变量不是很好的解决方法
也许有人有可行的想法。
无法直接在 DAG 文件中获取 DAG 运行 conf 中的值。如果没有它所属的 DAG 运行 的上下文,这是无法确定的。一种思考方式是,当您 运行 python my_dag.py
测试您的 DAG 文件是否编译时,它必须初始化所有这些运算符,而无需指定执行日期。所以任何可能与 DAG 运行 不同的东西都不能直接引用。
因此,您可以将其作为模板值传递,稍后当任务实际执行时,该值将与上下文一起呈现 运行。
wait_files_to_arrive_task = S3KeySensor(
task_id='wait_file_to_arrive',
bucket_key='file_{{ dag_run.conf["message"] }}',
...)
请注意,只会呈现运算符 template_fields
中列出的参数。幸运的是有人预料到了这一点,所以 bucket_key 确实是一个模板字段。