使用 Apache Airflow 将文件从一个 Google 云存储桶复制到另一个

Copy files from one Google Cloud Storage Bucket to other using Apache Airflow

问题:我想将文件从 Google Cloud Storage Bucket 中的文件夹(例如 Bucket1 中的 Folder1)复制到另一个 Bucket(例如 Bucket2)。我找不到 Google Cloud Storage 的任何 Airflow Operator 来复制文件。

我刚刚在 2 小时前上传的 contrib 中发现了一个新的运算符:https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_gcs.py 称为 GoogleCloudStorageToGoogleCloudStorageOperator,它应该将一个对象从一个存储桶复制到另一个存储桶,并在需要时重命名。

我知道这是一个老问题,但我发现自己也在处理这个任务。由于我使用的是 Google Cloud-Composer,GoogleCloudStorageToGoogleCloudStorageOperator 在当前版本中不可用。 我设法通过使用简单的 BashOperator

解决了这个问题
    from airflow.operators.bash_operator import BashOperator

with models.DAG(
            dag_name,
            schedule_interval=timedelta(days=1),
            default_args=default_dag_args) as dag:

        copy_files = BashOperator(
            task_id='copy_files',
            bash_command='gsutil -m cp <Source Bucket> <Destination Bucket>'
        )

非常简单,可以根据需要创建文件夹并重命名文件。

您可以使用 GoogleCloudStorageToGoogleCloudStorageOperator

以下代码将所有文件从源存储桶移动到目标存储桶。

包裹:https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/gcs_to_gcs/index.html

backup_file = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='Move_File_to_backupBucket',
    source_bucket='adjust_data_03sept2020',
    source_object='*.csv',
    destination_bucket='adjust_data_03sept2020_backup',
    move_object=True,
    google_cloud_storage_conn_id='connection_name',
    dag=dag
)