如何在自定义 docker 图像上使用 CeleryExecutor 运行 气流
How to run airflow with CeleryExecutor on a custom docker image
我正在向 Web 应用程序添加气流,该应用程序手动将包含业务逻辑的目录添加到 PYTHON_PATH
env var,以及我希望在所有服务器中保持一致的其他系统级设置我的集群。我已经成功地 运行 这个应用程序的 celery 与 RMQ 作为代理和 redis 作为任务结果后端一段时间,并且之前有经验 运行 Airflow LocalExecutor
.
我没有使用 Pukel 的图像,而是有一个基本 backend
图像的入口点,该图像运行基于 SERVICE
环境变量的不同服务。看起来像这样:
if [ $SERVICE == "api" ]; then
# upgrade to the data model
flask db upgrade
# start the web application
python wsgi.py
fi
if [ $SERVICE == "worker" ]; then
celery -A tasks.celery.celery worker --loglevel=info --uid=nobody
fi
if [ $SERVICE == "scheduler" ]; then
celery -A tasks.celery.celery beat --loglevel=info
fi
if [ $SERVICE == "airflow" ]; then
airflow initdb
airflow scheduler
airflow webserver
我有一个 .env
文件,我用定义的气流参数构建容器:
AIRFLOW_HOME=/home/backend/airflow
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+pymysql://${MYSQL_USER}:${MYSQL_ROOT_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/airflow?charset=utf8mb4
AIRFLOW__CELERY__BROKER_URL=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@${RABBITMQ_HOST}:5672
AIRFLOW__CELERY__RESULT_BACKEND=redis://${REDIS_HOST}
根据我当前的入口点设置方式,它无法到达 webserver
。相反,它通过调用 Web 服务器在前台运行 scheduler
。我可以将其更改为
airflow initdb
airflow scheduler -D
airflow webserver
现在网络服务器运行,但它不知道调度程序,它现在 运行 作为守护进程:
然而,Airflow 确实知道我正在使用 CeleryExecutor
并在正确的位置寻找 dags:
airflow | [2020-07-29 21:48:35,006] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
airflow | [2020-07-29 21:48:35,010] {__init__.py:50} INFO - Using executor CeleryExecutor
airflow | [2020-07-29 21:48:35,010] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags
airflow | [2020-07-29 21:48:35,113] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
我可以通过进入容器并手动启动调度程序来解决这个问题:
诀窍似乎是 运行 两个进程都在容器内的前台运行,但我仍然不知道如何在入口点内执行此操作。我已经检查了 Pukel 的入口点代码,但我并不清楚他在做什么。我敢肯定,只需稍作调整,比赛就会结束……在此先感谢您的帮助。此外,如果有任何主要的反模式我有 运行 进入这里的风险,我很乐意获得反馈,以便我可以正确地实施气流。这是我第一次实施 CeleryExecutor
,并且涉及的数量可观。
尝试使用 nohup。 https://en.wikipedia.org/wiki/Nohup
nohup airflow scheduler >scheduler.log &
在您的情况下,您将按如下方式更新入口点:
if [ $SERVICE == "airflow" ]; then
airflow initdb
nohup airflow scheduler > scheduler.log &
nohup airflow webserver
fi
我正在向 Web 应用程序添加气流,该应用程序手动将包含业务逻辑的目录添加到 PYTHON_PATH
env var,以及我希望在所有服务器中保持一致的其他系统级设置我的集群。我已经成功地 运行 这个应用程序的 celery 与 RMQ 作为代理和 redis 作为任务结果后端一段时间,并且之前有经验 运行 Airflow LocalExecutor
.
我没有使用 Pukel 的图像,而是有一个基本 backend
图像的入口点,该图像运行基于 SERVICE
环境变量的不同服务。看起来像这样:
if [ $SERVICE == "api" ]; then
# upgrade to the data model
flask db upgrade
# start the web application
python wsgi.py
fi
if [ $SERVICE == "worker" ]; then
celery -A tasks.celery.celery worker --loglevel=info --uid=nobody
fi
if [ $SERVICE == "scheduler" ]; then
celery -A tasks.celery.celery beat --loglevel=info
fi
if [ $SERVICE == "airflow" ]; then
airflow initdb
airflow scheduler
airflow webserver
我有一个 .env
文件,我用定义的气流参数构建容器:
AIRFLOW_HOME=/home/backend/airflow
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+pymysql://${MYSQL_USER}:${MYSQL_ROOT_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/airflow?charset=utf8mb4
AIRFLOW__CELERY__BROKER_URL=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@${RABBITMQ_HOST}:5672
AIRFLOW__CELERY__RESULT_BACKEND=redis://${REDIS_HOST}
根据我当前的入口点设置方式,它无法到达 webserver
。相反,它通过调用 Web 服务器在前台运行 scheduler
。我可以将其更改为
airflow initdb
airflow scheduler -D
airflow webserver
现在网络服务器运行,但它不知道调度程序,它现在 运行 作为守护进程:
然而,Airflow 确实知道我正在使用 CeleryExecutor
并在正确的位置寻找 dags:
airflow | [2020-07-29 21:48:35,006] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
airflow | [2020-07-29 21:48:35,010] {__init__.py:50} INFO - Using executor CeleryExecutor
airflow | [2020-07-29 21:48:35,010] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags
airflow | [2020-07-29 21:48:35,113] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
我可以通过进入容器并手动启动调度程序来解决这个问题:
诀窍似乎是 运行 两个进程都在容器内的前台运行,但我仍然不知道如何在入口点内执行此操作。我已经检查了 Pukel 的入口点代码,但我并不清楚他在做什么。我敢肯定,只需稍作调整,比赛就会结束……在此先感谢您的帮助。此外,如果有任何主要的反模式我有 运行 进入这里的风险,我很乐意获得反馈,以便我可以正确地实施气流。这是我第一次实施 CeleryExecutor
,并且涉及的数量可观。
尝试使用 nohup。 https://en.wikipedia.org/wiki/Nohup
nohup airflow scheduler >scheduler.log &
在您的情况下,您将按如下方式更新入口点:
if [ $SERVICE == "airflow" ]; then
airflow initdb
nohup airflow scheduler > scheduler.log &
nohup airflow webserver
fi