如何在 Airflow 中创建一个 DAG,它将显示 Docker 容器的使用情况?

How to create a DAG in Airflow, which will display the usage of a Docker container?

我目前正在使用 Airflow(版本:1.10.10),

我有兴趣创建一个 DAG,运行 每小时,

这将带来 Docker 容器的使用信息(磁盘使用情况),

(可通过 docker CLI 命令 (df -h) 获得的信息)。

我了解到: “如果 xcom_push 为 True,则当 bash 命令完成时,写入 stdout 的最后一行也将被推送到 XCom”

但我的目标是从 bash 命令中获取特定值, 不是最后一行写的。

例如, 我想要这条线(见截图)

"tmpfs 6.2G 0 6.2G 0% /sys/fs/cgroup"

进入我的 Xcom 值,这样我就可以编辑并从中提取特定值,

如何将 Xcom 值推送到 PythonOperator,以便我可以对其进行编辑?

我在下面添加我的示例 DAG 脚本,

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = { 
    'retry': 5,
    'retry_delay': timedelta(minutes=5) 
}

with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
    bash_task = BashOperator(task_id='bash_task', bash_command="df -h", xcom_push=True)
bash_task

适用吗? 非常感谢,

你应该做这项工作:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = { 
    'retry': 5,
    'retry_delay': timedelta(minutes=5) 
}

with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
    bash_task = BashOperator(task_id='bash_task', bash_command="docker stats --no-stream --format '{{ json .}}' <container-id>", xcom_push=True)
bash_task

您可以通过运算符的 output 属性检索推送到 XCom 存储的值。

在下面的代码片段中,bash.output 是一个 XComArg,在执行任务实例时将被拉取并作为可调用函数的第一个参数传递。

from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models import DAG

with DAG(dag_id='bash_dag') as dag:

    bash_task = BashOperator(
        task_id='bash_task', bash_command="df -h", xcom_push=True)

    def format_fun(stat_terminal_output):
        pass

    format_task = PythonOperator(
                python_callable=format_fun,
                task_id="format_task",
                op_args=[bash_task.output],
            )

    bash_task >> format_task