Airflow 不启动我的 dag(激活 python 环境并启动 python 脚本)
Airflow does not launch my dags (activate a python environment and launch a python script)
我想安排一个 Python 作业每 15 分钟启动一次。我已经使用过气流并且之前没有遇到任何问题。
我创建了一个开始日期较早的 dag,频率为 15 分钟和两个任务,其中包括激活虚拟 Python 环境,然后启动 python 脚本。
然而,我的 dags 并没有自行执行,所以我启动了一个网络服务器来检查它的状态,但没有任何反应。因此,我尝试使用 trigger_dag 命令从外部启动它,但它的状态仍然是 运行。我真的不明白问题是什么,任何帮助将不胜感激。我附上了两个显示问题的 Airflow 网络服务器屏幕截图。
编辑:添加了 dags.py 文件,这里是我的 dags 的定义:
import os
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2017,10,13,0,0,0,0),
'email': ['test@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
#'end_date': datetime(2017, 9, 23),
}
dag = DAG('dbscan_integ', default_args=default_args)
t_dbscan = BashOperator(
task_id='job_batch_dbscan',
bash_command='/home/test/Documents/git_repo/analyser/algo_integ/integration_dbscan/python main_algo.py',
dag=dag)
t_virtual_dbscan = BashOperator(
task_id='virtual_dbscan',
bash_command='source activate integdb',
dag=dag)
t_dbscan.set_upstream(t_virtual_dbscan)
我知道这很愚蠢,但是你有调度器和工作器吗运行?
编辑 1:
好的,我认为原因是你的 dag 中没有 schedule_interval,而是将 timedelta(minutes=15) 给 retry_delay。
我想安排一个 Python 作业每 15 分钟启动一次。我已经使用过气流并且之前没有遇到任何问题。 我创建了一个开始日期较早的 dag,频率为 15 分钟和两个任务,其中包括激活虚拟 Python 环境,然后启动 python 脚本。
然而,我的 dags 并没有自行执行,所以我启动了一个网络服务器来检查它的状态,但没有任何反应。因此,我尝试使用 trigger_dag 命令从外部启动它,但它的状态仍然是 运行。我真的不明白问题是什么,任何帮助将不胜感激。我附上了两个显示问题的 Airflow 网络服务器屏幕截图。
编辑:添加了 dags.py 文件,这里是我的 dags 的定义:
import os
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2017,10,13,0,0,0,0),
'email': ['test@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
#'end_date': datetime(2017, 9, 23),
}
dag = DAG('dbscan_integ', default_args=default_args)
t_dbscan = BashOperator(
task_id='job_batch_dbscan',
bash_command='/home/test/Documents/git_repo/analyser/algo_integ/integration_dbscan/python main_algo.py',
dag=dag)
t_virtual_dbscan = BashOperator(
task_id='virtual_dbscan',
bash_command='source activate integdb',
dag=dag)
t_dbscan.set_upstream(t_virtual_dbscan)
我知道这很愚蠢,但是你有调度器和工作器吗运行?
编辑 1:
好的,我认为原因是你的 dag 中没有 schedule_interval,而是将 timedelta(minutes=15) 给 retry_delay。