Airflow 任务触发成功但没有文件上传
Airflow task triggers successfully but no file uploaded
我正在尝试 运行 使用 Airflow 的简单 ETL DAG。 Airflow 显示 DAG 已成功触发,但没有任何内容上传到我的计算机。当我自己 运行 函数时,它工作正常。
这是从 api 中提取数据的 etl 函数,对其进行转换并将其加载到 sqlite 数据库文件中。
from sqlite3.dbapi2 import Cursor
import requests
import pandas as pd
from datetime import datetime
import datetime
import sqlalchemy
import sqlite3
from sqlalchemy.orm import sessionmaker
pd.options.mode.chained_assignment = None
def run_activity_etl():
DATABASE_LOCATION = "sqlite:///run_activity.sqlite"
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=1)
yesterday = yesterday.strftime("%Y-%m-%d")
#extract
access_token = "****" #not relevant for my issue
header = {'Authorization': 'Bearer {}'.format(access_token)}
response = requests.get("https://api.fitbit.com/1/user/-/activities/list.json?afterDate=2021-07-01&sort=asc&offset=0&limit=100", headers=header).json()
activity_data=pd.json_normalize(response['activities'], sep="_")
#transform
subset=['startTime', 'activityName', 'distance', 'duration', 'speed', 'averageHeartRate','calories', 'steps']
subset_activity_data=activity_data[subset]
subset_run = subset_activity_data[subset_activity_data['activityName']=='Run']
subset_run["startTime"]= pd.to_datetime(subset_run["startTime"])
subset_run["date"] = subset_run["startTime"].dt.strftime("%Y-%m-%d")
#load
engine= sqlalchemy.create_engine(DATABASE_LOCATION)
conn=sqlite3.connect('run_activity.sqlite')
cursor=conn.cursor()
sql_query = """
CREATE TABLE IF NOT EXISTS run_activity(
date VARCHAR(200),
activityName VARCHAR(200),
distance VARCHAR(200),
duration VARCHAR(200),
speed VARCHAR(200),
averageHeartRate VARCHAR(200),
calories VARCHAR(200),
steps VARCHAR(200),
startTime VARCHAR(200),
CONSTRAINT primary_key_constraint PRIMARY KEY (startTime)
)
"""
cursor.execute(sql_query)
print("Opened database successfully")
try:
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
conn.close()
print("Close database successfully")
我的 DAG 文件:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from fitbit_api import run_activity_etl
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(0,0,0,0,0),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'activity_dag',
default_args=default_args,
description='ETL process for Fitbit on running data!',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='whole_activity_etl',
python_callable=run_activity_etl,
dag=dag,
)
run_etl
这是来自 airflow 的日志:
Log from airflow
感谢任何帮助!
目前您有以下内容:
try:
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
这意味着它会捕获存在的任何异常。如果您删除 try except
并且只删除 运行
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
您可能会看到异常。如果您post在这里,我们可以进一步帮助您。
我仍然需要学习很多东西,但我找到了解决问题的方法。显然,数据库位置应该是:sqlite:////opt/airflow/dags/run_activity.sqlite
在我的例子中,因为这也写在我的 docker-compose 文件中:
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
现在,此文件已顺利上传。
我正在尝试 运行 使用 Airflow 的简单 ETL DAG。 Airflow 显示 DAG 已成功触发,但没有任何内容上传到我的计算机。当我自己 运行 函数时,它工作正常。
这是从 api 中提取数据的 etl 函数,对其进行转换并将其加载到 sqlite 数据库文件中。
from sqlite3.dbapi2 import Cursor
import requests
import pandas as pd
from datetime import datetime
import datetime
import sqlalchemy
import sqlite3
from sqlalchemy.orm import sessionmaker
pd.options.mode.chained_assignment = None
def run_activity_etl():
DATABASE_LOCATION = "sqlite:///run_activity.sqlite"
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=1)
yesterday = yesterday.strftime("%Y-%m-%d")
#extract
access_token = "****" #not relevant for my issue
header = {'Authorization': 'Bearer {}'.format(access_token)}
response = requests.get("https://api.fitbit.com/1/user/-/activities/list.json?afterDate=2021-07-01&sort=asc&offset=0&limit=100", headers=header).json()
activity_data=pd.json_normalize(response['activities'], sep="_")
#transform
subset=['startTime', 'activityName', 'distance', 'duration', 'speed', 'averageHeartRate','calories', 'steps']
subset_activity_data=activity_data[subset]
subset_run = subset_activity_data[subset_activity_data['activityName']=='Run']
subset_run["startTime"]= pd.to_datetime(subset_run["startTime"])
subset_run["date"] = subset_run["startTime"].dt.strftime("%Y-%m-%d")
#load
engine= sqlalchemy.create_engine(DATABASE_LOCATION)
conn=sqlite3.connect('run_activity.sqlite')
cursor=conn.cursor()
sql_query = """
CREATE TABLE IF NOT EXISTS run_activity(
date VARCHAR(200),
activityName VARCHAR(200),
distance VARCHAR(200),
duration VARCHAR(200),
speed VARCHAR(200),
averageHeartRate VARCHAR(200),
calories VARCHAR(200),
steps VARCHAR(200),
startTime VARCHAR(200),
CONSTRAINT primary_key_constraint PRIMARY KEY (startTime)
)
"""
cursor.execute(sql_query)
print("Opened database successfully")
try:
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
conn.close()
print("Close database successfully")
我的 DAG 文件:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from fitbit_api import run_activity_etl
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(0,0,0,0,0),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'activity_dag',
default_args=default_args,
description='ETL process for Fitbit on running data!',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='whole_activity_etl',
python_callable=run_activity_etl,
dag=dag,
)
run_etl
这是来自 airflow 的日志:
Log from airflow
感谢任何帮助!
目前您有以下内容:
try:
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
这意味着它会捕获存在的任何异常。如果您删除 try except
并且只删除 运行
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
您可能会看到异常。如果您post在这里,我们可以进一步帮助您。
我仍然需要学习很多东西,但我找到了解决问题的方法。显然,数据库位置应该是:sqlite:////opt/airflow/dags/run_activity.sqlite
在我的例子中,因为这也写在我的 docker-compose 文件中:
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
现在,此文件已顺利上传。