如何使用作曲家 DAG 递归地从 GCP 存储桶中读取文件名

How to read file names from GCP buckets recursively using a composer DAG

我正在尝试使用作曲家 DAG 从存储桶下的所有文件夹、子文件夹递归地读取 GCS 存储桶中的文件名。可能吗?例如,我有一个包含相应文件夹和子文件夹的存储桶,如下所述。静态是存储桶名称。

static/folder1/subfolder1/file1.json static/folder1/subfolder2/file2.json static/folder1/subfolder3/file3.json static/folder1/subfolder3/file4.json

我想递归读取文件并将数据放入如下两个变量中。

桶名=静态 文件路径 = static/folder1/subfolder3/file4.json

您可以使用 Airflow 的 BashOperator 来使用 GCS CLI 工具 (docs here)。

示例如下:

read_files = BashOperator(
    task_id='read_files',
    bash_command='gsutil ls -r gs://bucket',
    dag=dag,
)

编辑:由于您想捕获输出并且 BashOperator 仅将标准输出的最后一行推送到 XCom,我建议使用 PythonOperator 调用自定义 Python可调用的,它使用 GCS API 甚至通过 subprocess 的 CLI 工具来收集所有文件名并将其推送到 XCom 供下游任务后续使用。除非您根本不需要其他任务来使用这些数据,在这种情况下,您可以随意处理它(问题中不清楚)。