在 Airflow 中使用 Google Download Operator 处理多个文件

using Google Download Operator in Airflow with mutiple fies

我有一个大查询 table,我需要将其提取并填充到 MSSQL table 中。由于找不到 BigQuerytoMSSQL 运算符,因此我手动执行此操作。

我已经能够将 table 导出到一系列 <>_001.txt、<>_002.txt 等,并将它们存储到 GCS 中,但现在我需要让它们进入 Airflow 服务器。

我正在尝试使用 GoogleDownloadOperator,但它似乎有一个我无法修复的问题。

Export_to_Local = GoogleCloudStorageDownloadOperator(
    task_id='Export_GCS_to_Airflow_Staging',
    bucket='offrs',
    object='TAX_ASSESSOR_LIVE_*.txt',
    filename=Variable.get("temp_directory") + "TAL/*",
    google_cloud_storage_conn_id='GCP_Mother_Staging',
    dag=dag
)

以上代码导致此错误:

google.resumable_media.common.InvalidResponse: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

我错过了什么吗?不知道是什么问题

谢谢

GoogleCloudStorageDownloadOperator 不支持通配符,很遗憾。

如果您的 VM 已获得该存储桶的授权,最快的选择是在 BashOperator 中使用 gsutil 命令。

另一种选择是使用以下自定义运算符:

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException

WILDCARD = '*'


class CustomGcsDownloadOperator(BaseOperator):
    template_fields = ('source_bucket', 'source_object', 'destination_folder',
                       'destination_object',)
    ui_color = '#f0eee4'

    @apply_defaults
    def __init__(self,
                 source_bucket,
                 source_object,
                 destination_folder,
                 destination_object=None,
                 google_cloud_storage_conn_id='google_cloud_default',
                 delegate_to=None,
                 last_modified_time=None,
                 *args,
                 **kwargs):
        super(CustomGcsDownloadOperator,
              self).__init__(*args, **kwargs)
        self.source_bucket = source_bucket
        self.source_object = source_object
        self.destination_folder = destination_folder
        self.destination_object = destination_object
        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
        self.delegate_to = delegate_to
        self.last_modified_time = last_modified_time

    def execute(self, context):

        hook = GoogleCloudStorageHook(
            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
            delegate_to=self.delegate_to
        )


        if WILDCARD in self.source_object:
            total_wildcards = self.source_object.count(WILDCARD)
            if total_wildcards > 1:
                error_msg = "Only one wildcard '*' is allowed in source_object parameter. " \
                            "Found {} in {}.".format(total_wildcards, self.source_object)

                raise AirflowException(error_msg)

            prefix, delimiter = self.source_object.split(WILDCARD, 1)
            objects = hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)

            for source_object in objects:
                if self.destination_object is None:
                    destination_object = source_object
                else:
                    destination_object = source_object.replace(prefix,
                                                               self.destination_object, 1)

                self._download_single_object(hook=hook, source_object=source_object, 
                    destination_object=destination_object)
        else:
            self._download_single_object(hook=hook, source_object=self.source_object, 
                destination_object=self.destination_object)

    def _download_single_object(self, hook, source_object, destination_object):
        if self.last_modified_time is not None:
            # Check to see if object was modified after last_modified_time
            if hook.is_updated_after(self.source_bucket,
                                     source_object,
                                     self.last_modified_time):
                self.log.debug("Object has been modified after %s ", self.last_modified_time)
                pass
            else:
                return

        self.log.info('Executing copy of gs://%s/%s to file://%s/%s',
                      self.source_bucket, source_object,
                      self.destination_folder, destination_object)

        hook.download(self.source_bucket, source_object, destination_object)