Apache Airflow 卡在执行最后一个任务的循环中(bash 运算符执行 python 脚本)

Apache Airflow stuck in a loop executing last task (bash operator executing a python script)

我在我本地机器上的 docker 容器中 运行ning Airflow。我正在 运行 测试 DAG 执行 3 个任务。这三个任务 运行 很好,但是,最后一个带有 bash 运算符的任务卡在了一个循环中,如底部图片所示。查看日志文件,只为第一次执行 bash python 脚本生成一个条目,然后什么都没有,但是 python 文件一直在执行。关于可能是什么问题的任何建议?

谢谢,

理查德

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def creating_dataframe(ti):
    import pandas as pd
    import os

    loc = r'/opt/airflow/dags/'
    filename = r'demo.csv'
    df_location = loc + filename
    ti.xcom_push(key='df_location', value=df_location)

    if os.path.exists(loc + filename):
        print("if exists")
        return df_location
    
    else:
        df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['First entry']},
                      index = [pd.Timestamp.now()])
        df.to_csv(loc + filename, sep=';')
        print("does not exist")
    
    return df_location


def adding_row_to_dataframe(ti):
    import pandas as pd
    fetched_location = ti.xcom_pull(key='df_location', task_ids=['creating_dataframe'])[0]


    df = pd.read_csv(fetched_location,index_col=0,sep=';')   
    new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry to demo file']},
                      index = [pd.Timestamp.now()])
    df2 = pd.concat([df,new_df])
    df2.to_csv(fetched_location,sep=";")
    print("second function")

with DAG(
    dag_id="richards_airflow_demo",
    schedule_interval="@once",
    start_date=datetime(2022, 2, 17 ),
    catchup=False,
    tags=["this is a demo of airflow","adding row"],
) as dag:

    task1 = PythonOperator(
        task_id="creating_dataframe",
        python_callable=creating_dataframe,
        do_xcom_push=True
    )


    task2 = PythonOperator(
        task_id='adding_row_to_dataframe',
        python_callable=adding_row_to_dataframe


    )

    task3 = BashOperator(
        task_id='python_bash_script',
    bash_command=r"echo 'python /opt/scripts/test.py'"
    )
    

    task1 >> task2 >> task3

Bash python 脚本:

import pandas as pd

df = pd.read_csv('/opt/airflow/dags/demo.csv',index_col=0,sep=';')   
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry with bash python script']},
                      index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])

df2.to_csv('/opt/airflow/dags/demo.csv',sep=';')

Example of issue Log file for bashoperator

好吧,没有研究为什么会这样,但似乎如果我在 dags 文件夹中创建一个脚本文件夹,里面的 python 脚本 (test_dontputthescripthere.py)即使 bash 运算符没有告诉它执行,它也会被执行。如您所见,bashoperator 正在完美地执行 test.py 文件,并将以下行添加到 csv:

2022-02-21 15:11:53.923284;使用 bash 添加条目 python 脚本

test_dontputthescripthere.py 循环执行,并且没有 bash 运算符执行文件。这是 demo.csv 文件中的所有“- 这是错误的”条目。

我怀疑气流内部正在进行某种刷新,迫使它执行 python 文件。