在 Airflow 中使用 Google Download Operator 处理多个文件
using Google Download Operator in Airflow with mutiple fies
我有一个大查询 table,我需要将其提取并填充到 MSSQL table 中。由于找不到 BigQuerytoMSSQL 运算符,因此我手动执行此操作。
我已经能够将 table 导出到一系列 <>_001.txt、<>_002.txt 等,并将它们存储到 GCS 中,但现在我需要让它们进入 Airflow 服务器。
我正在尝试使用 GoogleDownloadOperator
,但它似乎有一个我无法修复的问题。
Export_to_Local = GoogleCloudStorageDownloadOperator(
task_id='Export_GCS_to_Airflow_Staging',
bucket='offrs',
object='TAX_ASSESSOR_LIVE_*.txt',
filename=Variable.get("temp_directory") + "TAL/*",
google_cloud_storage_conn_id='GCP_Mother_Staging',
dag=dag
)
以上代码导致此错误:
google.resumable_media.common.InvalidResponse: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)
我错过了什么吗?不知道是什么问题
谢谢
GoogleCloudStorageDownloadOperator
不支持通配符,很遗憾。
如果您的 VM 已获得该存储桶的授权,最快的选择是在 BashOperator 中使用 gsutil
命令。
另一种选择是使用以下自定义运算符:
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
WILDCARD = '*'
class CustomGcsDownloadOperator(BaseOperator):
template_fields = ('source_bucket', 'source_object', 'destination_folder',
'destination_object',)
ui_color = '#f0eee4'
@apply_defaults
def __init__(self,
source_bucket,
source_object,
destination_folder,
destination_object=None,
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
last_modified_time=None,
*args,
**kwargs):
super(CustomGcsDownloadOperator,
self).__init__(*args, **kwargs)
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_folder = destination_folder
self.destination_object = destination_object
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
self.last_modified_time = last_modified_time
def execute(self, context):
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)
if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
error_msg = "Only one wildcard '*' is allowed in source_object parameter. " \
"Found {} in {}.".format(total_wildcards, self.source_object)
raise AirflowException(error_msg)
prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)
for source_object in objects:
if self.destination_object is None:
destination_object = source_object
else:
destination_object = source_object.replace(prefix,
self.destination_object, 1)
self._download_single_object(hook=hook, source_object=source_object,
destination_object=destination_object)
else:
self._download_single_object(hook=hook, source_object=self.source_object,
destination_object=self.destination_object)
def _download_single_object(self, hook, source_object, destination_object):
if self.last_modified_time is not None:
# Check to see if object was modified after last_modified_time
if hook.is_updated_after(self.source_bucket,
source_object,
self.last_modified_time):
self.log.debug("Object has been modified after %s ", self.last_modified_time)
pass
else:
return
self.log.info('Executing copy of gs://%s/%s to file://%s/%s',
self.source_bucket, source_object,
self.destination_folder, destination_object)
hook.download(self.source_bucket, source_object, destination_object)
我有一个大查询 table,我需要将其提取并填充到 MSSQL table 中。由于找不到 BigQuerytoMSSQL 运算符,因此我手动执行此操作。
我已经能够将 table 导出到一系列 <>_001.txt、<>_002.txt 等,并将它们存储到 GCS 中,但现在我需要让它们进入 Airflow 服务器。
我正在尝试使用 GoogleDownloadOperator
,但它似乎有一个我无法修复的问题。
Export_to_Local = GoogleCloudStorageDownloadOperator(
task_id='Export_GCS_to_Airflow_Staging',
bucket='offrs',
object='TAX_ASSESSOR_LIVE_*.txt',
filename=Variable.get("temp_directory") + "TAL/*",
google_cloud_storage_conn_id='GCP_Mother_Staging',
dag=dag
)
以上代码导致此错误:
google.resumable_media.common.InvalidResponse: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)
我错过了什么吗?不知道是什么问题
谢谢
GoogleCloudStorageDownloadOperator
不支持通配符,很遗憾。
如果您的 VM 已获得该存储桶的授权,最快的选择是在 BashOperator 中使用 gsutil
命令。
另一种选择是使用以下自定义运算符:
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
WILDCARD = '*'
class CustomGcsDownloadOperator(BaseOperator):
template_fields = ('source_bucket', 'source_object', 'destination_folder',
'destination_object',)
ui_color = '#f0eee4'
@apply_defaults
def __init__(self,
source_bucket,
source_object,
destination_folder,
destination_object=None,
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
last_modified_time=None,
*args,
**kwargs):
super(CustomGcsDownloadOperator,
self).__init__(*args, **kwargs)
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_folder = destination_folder
self.destination_object = destination_object
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
self.last_modified_time = last_modified_time
def execute(self, context):
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)
if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
error_msg = "Only one wildcard '*' is allowed in source_object parameter. " \
"Found {} in {}.".format(total_wildcards, self.source_object)
raise AirflowException(error_msg)
prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)
for source_object in objects:
if self.destination_object is None:
destination_object = source_object
else:
destination_object = source_object.replace(prefix,
self.destination_object, 1)
self._download_single_object(hook=hook, source_object=source_object,
destination_object=destination_object)
else:
self._download_single_object(hook=hook, source_object=self.source_object,
destination_object=self.destination_object)
def _download_single_object(self, hook, source_object, destination_object):
if self.last_modified_time is not None:
# Check to see if object was modified after last_modified_time
if hook.is_updated_after(self.source_bucket,
source_object,
self.last_modified_time):
self.log.debug("Object has been modified after %s ", self.last_modified_time)
pass
else:
return
self.log.info('Executing copy of gs://%s/%s to file://%s/%s',
self.source_bucket, source_object,
self.destination_folder, destination_object)
hook.download(self.source_bucket, source_object, destination_object)