如何使用 table 名称作为列表并使用 for 循环为 Airflow DAG 中的所有 table 创建任务?

How to Use the table names as a list and use for loop to create tasks for all the tables in Airflow DAG?

我正在尝试通过我的 DAG 中的 for 循环访问 mysql 中的 table。我在列表中传递了 table 名称并从变量中获取值。我的代码工作正常,但问题出在 for 循环中,我无法获得所有 table 的结果,仅收到我在列表中提到的最后一个 table。

我不知道我在哪里犯了错误,谁能指导我解决这个问题..

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.mysql_to_s3 import MySQLToS3Operator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
import json


dag = DAG(
    'Multi_Incremental_Export',
    start_date=datetime(2021, 12, 5),
    default_args={'mysql_conn_id': 'mysql_connection','provide_context' : True},
    catchup=False
)

tab1 = Variable.get('table1')

tab2 = Variable.get('table2')

tab3 = Variable.get('table3')

tables = [tab1,tab2,tab3]

start = DummyOperator(task_id='dummy_task1', retries=2, dag=dag)

def max_source(**kwargs):
  ti = kwargs['ti']
  request = f"select cast(max(created_at)as char) from {tab};"
  mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  output = [x for x in cursor.fetchone()]
  return output[0]

def max_metatbl(**kwargs):
  ti = kwargs['ti']
  request = f"select coalesce(max(created_at),'2021-12-05 00:00:00') from {meta_tbl};"
  mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  output = [x for x in cursor.fetchone()]
  return output[0]

def transfer(**kwargs):
   ti=kwargs['ti']
   meta_val = ti.xcom_pull(task_ids=max_meta)
   trns = MySQLToS3Operator(
   task_id ='mysql_task',
   query =f"select * from Employee where created_at >'{meta_val}';",
   s3_bucket = 'mydta',
   s3_key = f'{tab}-{val}.csv',
   mysql_conn_id = 'mysql_connection',
   aws_conn_id  = 'aws_connection',
   file_format  = 'csv',
   pd_kwargs = {'header' : False}
   )
   trns.execute(dict())

def mx_update(**kwargs):
  ti=kwargs['ti']
  meta_max = ti.xcom_pull(task_ids=mx_src)
  request = f"insert into {meta_tbl} values('{meta_max}');"
  mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  connection.commit()

for tab in tables:

 meta_tbl = f'{tab}'+'_Metatable'

 max_meta = f'{tab}'+'_maxmeta'

 meta_updt = f'{tab}'+ '_metaupdate'

 mx_src=f'{tab}'+'_maxsrc'

 mysql_task = f'{tab}'+'_mysql_task'

 val  = datetime.now().strftime("%m-%d-%Y, %H.%M")

 t1 = PythonOperator(
    task_id=mx_src,
    python_callable=max_source,
    provide_context=True,
    dag=dag
 )

 t2 = PythonOperator(
    task_id=max_meta,
    python_callable=max_metatbl,
    provide_context=True,
    dag=dag
 )

 t3 = PythonOperator(
    task_id= mysql_task,
    python_callable= transfer,
    provide_context=True,
    dag=dag
 )

 t4 = PythonOperator(
    task_id= meta_updt,
    python_callable= mx_update,
    provide_context=True,
    dag=dag
 )

 start >> t1 >> t2 >> t3 >> t4

我终于得到了我所做的输出更改,即在 python 运算符中使用 op_kwargs 将 table 名称作为参数传递,还包括被调用函数的参数。

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.mysql_to_s3 import MySQLToS3Operator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
import json


dag = DAG(
    'Multi_Incremental_Export',
    start_date=datetime(2021, 12, 5),
    default_args={'mysql_conn_id': 'mysql_connection','provide_context' : True},
    catchup=False
)

start = DummyOperator(task_id='dummy_task1', retries=2, dag=dag)

def max_source(tab,**kwargs):
  ti = kwargs['ti']
  request = f"select cast(max(created_at)as char) from {tab};"
  mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  output = [x for x in cursor.fetchone()]
  return output[0]

def max_metatbl(tab,**kwargs):
  ti = kwargs['ti']
  table = tab
  meta_tbl = f'{tab}'+'_Metatable'
  request = f"select coalesce(max(created_at),'2021-12-05 00:00:00') from {meta_tbl};"
  mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  output = [x for x in cursor.fetchone()]
  return output[0]

def transfer(tab,**kwargs):
   max_meta = f'{tab}'+'_maxmeta'
   ti=kwargs['ti']
   meta_val = ti.xcom_pull(task_ids=max_meta)
   val  = datetime.now().strftime("%m-%d-%Y, %H.%M")
   trns = MySQLToS3Operator(
   task_id ='mysql_task',
   query =f"select * from {tab} where created_at >'{meta_val}';",
   s3_bucket = 'mydta',
   s3_key = f'{tab}-{val}.csv',
   mysql_conn_id = 'mysql_connection',
   aws_conn_id  = 'aws_connection',
   file_format  = 'csv',
   pd_kwargs = {'header' : False}
   )
   trns.execute(dict())

def mx_update(tab,**kwargs):
  ti=kwargs['ti']
  mx_src=f'{tab}'+'_maxsrc'
  meta_max = ti.xcom_pull(task_ids=mx_src)
  meta_tbl = f'{tab}'+'_Metatable'
  request = f"insert into {meta_tbl} values('{meta_max}');"
  mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  connection.commit()

tables = ['Employee','Student','Staff','Teachers']

for tab in tables:

 max_meta = f'{tab}'+'_maxmeta'

 meta_updt = f'{tab}'+ '_metaupdate'

 mx_src=f'{tab}'+'_maxsrc'

 mysql_task = f'{tab}'+'_mysql_task'

 t1 = PythonOperator(
    task_id=mx_src,
    python_callable=max_source,
    op_kwargs={"tab": tab},
    provide_context=True,
    dag=dag
 )

 t2 = PythonOperator(
    task_id=max_meta,
    python_callable=max_metatbl,
    op_kwargs={"tab": tab },
    provide_context=True,
    dag=dag
 )

 t3 = PythonOperator(
    task_id= mysql_task,
    python_callable= transfer,
    op_kwargs={"tab": tab},
    provide_context=True,
    dag=dag
 )

 t4 = PythonOperator(
    task_id= meta_updt,
    python_callable= mx_update,
    op_kwargs={"tab": tab},
    provide_context=True,
    dag=dag
 )

 start >> t1 >> t2 >> t3 >> t4