当 运行 Apache Airflow 在 Docker 时,我该如何解决我的 DAG 即使在修复后也不会完好无损的问题?

When running Apache Airflow in Docker how can I fix the issue where my DAGs don't become unbroken even after fixing them?


所以在我的例子中,我之前 运行 Airflow 直接在我的机器上进行本地化,现在我正尝试使用 docker 通过容器 运行 它,同时还保留我以前的 dags 的历史记录。但是我遇到了一些问题。
一点背景知识……当我第一次使用 docker-compose 来启动我的容器时,airflow 发送了一条错误消息,指出列 dag_has_import_errors 不存在。所以我就继续创建它,一切似乎都很好。
但是现在我的狗都坏了,当我修改一个但没有解决问题时,我可以在网络服务器顶部显示的简短错误信息中看到更新的代码行。
但是,当我解决问题时,代码没有改变,DAG 仍然损坏。 我会提供
this image of the error
this is the image of the code\

下面也是我的 docker-compose 文件(我注释掉了 airflow db init 但我应该将它与 db upgrade 参数保持一致吗?My compose file is based on this template\

version: '3.1'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    # postgresql+psycopg2://postgres:airflow@localhost:5434/airflowdb
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@postgres:5434/airflowdb
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://postgres:airflow@postgres:5434/airflowdb
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflowdb
      PGPORT: 5434

    volumes:
      - pipeline-scripts_airflow-docker-db:/var/lib/postgresql/data
      # - postgres-db-volume:/var/lib/postgresql/data
    ports:
      - 5434:5434
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "postgres"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

#below here
  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

# volumes:
#   postgres-db-volume:
volumes: 
    pipeline-scripts_airflow-docker-db:
        external: true

我的容器中的日志也很有趣,它们如下\

apache-airflow-airflow-scheduler-1  | Process DagFileProcessor4728-Process:
apache-airflow-airflow-scheduler-1  | Traceback (most recent call last):
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
apache-airflow-airflow-scheduler-1  |     self.run()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
apache-airflow-airflow-scheduler-1  |     self._target(*self._args, **self._kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 168, in _run_file_processor
apache-airflow-airflow-scheduler-1  |     callback_requests=callback_requests,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 663, in process_file
apache-airflow-airflow-scheduler-1  |     dagbag.sync_to_db()
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 608, in sync_to_db
apache-airflow-airflow-scheduler-1  |     for attempt in run_with_db_retries(logger=self.log):
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 382, in __iter__
apache-airflow-airflow-scheduler-1  |     do = self.iter(retry_state=retry_state)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 349, in iter
apache-airflow-airflow-scheduler-1  |     return fut.result()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
apache-airflow-airflow-scheduler-1  |     return self.__get_result()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
apache-airflow-airflow-scheduler-1  |     raise self._exception
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 622, in sync_to_db
apache-airflow-airflow-scheduler-1  |     DAG.bulk_write_to_db(self.dags.values(), session=session)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in bulk_write_to_db
apache-airflow-airflow-scheduler-1  |     most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in <dictcomp>
apache-airflow-airflow-scheduler-1  |     most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 100, in instances
apache-airflow-airflow-scheduler-1  |     cursor.close()
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
apache-airflow-airflow-scheduler-1  |     with_traceback=exc_tb,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
apache-airflow-airflow-scheduler-1  |     raise exception
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in instances
apache-airflow-airflow-scheduler-1  |     rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in <listcomp>
apache-airflow-airflow-scheduler-1  |     rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 588, in _instance
apache-airflow-airflow-scheduler-1  |     populators,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 725, in _populate_full
apache-airflow-airflow-scheduler-1  |     dict_[key] = getter(row)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1723, in process
apache-airflow-airflow-scheduler-1  |     return loads(value)
apache-airflow-airflow-scheduler-1  | ValueError: unsupported pickle protocol: 5

如果需要任何其他信息,我很乐意提供。
编辑: 微小的更新,所以我不想提前 运行\ docker exec -it apache-airflow-airflow-webserver-1 bash 然后 airflow db upgrade 因为毕竟它只是蒸馏器,不应该删除我的数据。
所以在这样做之后,它自己添加了缺失的列 like so。 所以现在当我查看我正在使用的 postgres 数据库时,它显示 dag.has_import_errors if false。
然而在 table import_error 我仍然有同样的问题,dags 没有更新。

让我们开始吧!
潘科米多!
DU GATEAU!
终于让它工作了:)。所以主要问题是我没有所有需要的包。所以我尝试在容器中只做 pip install configparser,这实际上帮助了我必须 运行 的一个 DAG。然而,这似乎不可持续也不实用,所以我决定继续使用 Dockerfile 方法来有效地扩展图像。我相信他们就是这样称呼它的。 所以这是我的 Dockerfile \

FROM apache/airflow:2.2.3-python3.8

COPY requirements.txt ./

RUN pip install -r requirements.txt

现在关于这个 Dockerfile 有两个重要的事情,一个是我当然安装了我可能需要的依赖项,但是我的一些依赖项与气流的依赖项发生冲突,我只是决定从我的 requirements.txt 文件中删除它们.
第二件事是添加 python3.8 这实际上消除了错误 ValueError: unsupported pickle protocol: 5 这将阻止你看到你的狗的历史。
我遇到的其他问题是找到将文件放入容器中的方法,例如 ssh 操作员的密钥文件,但这是另一回事了:D.
然后,如果当然在 docker-compose.yaml 文件中,您必须按以下方式编辑它 \

  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3-python3.8}
  build: .
  environment:

这些解决了大部分问题。
唯一让我困扰的是 apache-airflow-airflow-webserver-1 在 docker 日志中显示为红色。所以我不确定这是否正常,但除此之外,当我 运行 docker ps.

时一切正常