如何在 Airflow 中创建一个 DAG,它将显示 Docker 容器的使用情况?
How to create a DAG in Airflow, which will display the usage of a Docker container?
我目前正在使用 Airflow(版本:1.10.10),
我有兴趣创建一个 DAG,运行 每小时,
这将带来 Docker 容器的使用信息(磁盘使用情况),
(可通过 docker CLI 命令 (df -h) 获得的信息)。
我了解到:
“如果 xcom_push 为 True,则当 bash 命令完成时,写入 stdout 的最后一行也将被推送到 XCom”
但我的目标是从 bash 命令中获取特定值,
不是最后一行写的。
例如,
我想要这条线(见截图)
"tmpfs 6.2G 0 6.2G 0% /sys/fs/cgroup"
进入我的 Xcom 值,这样我就可以编辑并从中提取特定值,
如何将 Xcom 值推送到 PythonOperator,以便我可以对其进行编辑?
我在下面添加我的示例 DAG 脚本,
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="df -h", xcom_push=True)
bash_task
适用吗?
非常感谢,
你应该做这项工作:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="docker stats --no-stream --format '{{ json .}}' <container-id>", xcom_push=True)
bash_task
您可以通过运算符的 output
属性检索推送到 XCom 存储的值。
在下面的代码片段中,bash.output
是一个 XComArg,在执行任务实例时将被拉取并作为可调用函数的第一个参数传递。
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models import DAG
with DAG(dag_id='bash_dag') as dag:
bash_task = BashOperator(
task_id='bash_task', bash_command="df -h", xcom_push=True)
def format_fun(stat_terminal_output):
pass
format_task = PythonOperator(
python_callable=format_fun,
task_id="format_task",
op_args=[bash_task.output],
)
bash_task >> format_task
我目前正在使用 Airflow(版本:1.10.10),
我有兴趣创建一个 DAG,运行 每小时,
这将带来 Docker 容器的使用信息(磁盘使用情况),
(可通过 docker CLI 命令 (df -h) 获得的信息)。
我了解到: “如果 xcom_push 为 True,则当 bash 命令完成时,写入 stdout 的最后一行也将被推送到 XCom”
但我的目标是从 bash 命令中获取特定值, 不是最后一行写的。
例如, 我想要这条线(见截图)
"tmpfs 6.2G 0 6.2G 0% /sys/fs/cgroup"
进入我的 Xcom 值,这样我就可以编辑并从中提取特定值,
如何将 Xcom 值推送到 PythonOperator,以便我可以对其进行编辑?
我在下面添加我的示例 DAG 脚本,
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="df -h", xcom_push=True)
bash_task
适用吗? 非常感谢,
你应该做这项工作:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="docker stats --no-stream --format '{{ json .}}' <container-id>", xcom_push=True)
bash_task
您可以通过运算符的 output
属性检索推送到 XCom 存储的值。
在下面的代码片段中,bash.output
是一个 XComArg,在执行任务实例时将被拉取并作为可调用函数的第一个参数传递。
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models import DAG
with DAG(dag_id='bash_dag') as dag:
bash_task = BashOperator(
task_id='bash_task', bash_command="df -h", xcom_push=True)
def format_fun(stat_terminal_output):
pass
format_task = PythonOperator(
python_callable=format_fun,
task_id="format_task",
op_args=[bash_task.output],
)
bash_task >> format_task