如何 fetch/get Google 云传输作业的 运行 历史详细信息 python?
How to fetch/get Google cloud transfer job's run history details in python?
我的 GCP 帐户中几乎没有 Google 云传输作业 运行ning,它将数据从 Azure 传输到 GCS 存储桶。
根据此文档 - https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/get?apix_params=%7B%22jobName%22%3A%22transferJobs%2F213858246512856794%22%2C%22projectId%22%3A%22merlincloud-gcp-preprod%22%7D
“get”方法可以获取作业的详细信息,例如名称、描述、bucketName、status、includePrefixes、storageAccount 等。
这是“get”方法的示例输出。
{
"name": "transferJobs/<job_name>",
"description": "<description given while creating job>",
"projectId": "<project_id>",
"transferSpec": {
"gcsDataSink": {
"bucketName": "<destination_bucket>"
},
"objectConditions": {
"includePrefixes": [
"<prefix given while creating job>"
],
"lastModifiedSince": "2021-06-30T18:30:00Z"
},
"transferOptions": {
},
"azureBlobStorageDataSource": {
"storageAccount": "<account_name>",
"container": "<container_name>"
}
},
"schedule": {
"scheduleStartDate": {
"year": 2021,
"month": 7,
"day": 1
},
"startTimeOfDay": {
"hours": 13,
"minutes": 45
},
"repeatInterval": "86400s"
},
"status": "ENABLED",
"creationTime": "2021-07-01T06:08:19.392111916Z",
"lastModificationTime": "2021-07-01T06:13:32.460934533Z",
"latestOperationName": "transferOperations/transferJobs-<job_name>"
}
现在,如何获取 python 中特定作业的 运行 历史详细信息?
“运行 历史详细信息”是指 GTS 控制台中显示的指标(传输的数据、文件数量、状态、大小、持续时间),如下图所示。
我对传输服务不熟悉,但我对 GCP 非常熟悉。
该服务提供的唯一其他资源是 transferOperations
。
是否提供了您需要的数据?
如果不是 (!),可能是 Google 没有在控制台之外公开此功能。这种情况偶尔会发生,即使意图始终是 (public) API 首先。
您可以调查的一种方法是检查浏览器的开发人员工具 'network' 选项卡,以查看控制台正在调用什么 REST API 来满足请求。另一种方法是使用等效的 gcloud
命令并添加 --log-http
以查看底层 REST API 调用方式。
正如@DazWilkin 提到的,我能够使用 transferOperations - list API
获取每个作业的 运行 历史详细信息
我编写了一个 Cloud Functions 来通过调用 API 来获取 GTS 指标。
最初它使 tansferJobs - list API 调用并获取作业列表,并且仅获取所需的作业详细信息。然后它使“transferOperations”API 调用并传递作业名称以获取 运行 历史详细信息。
代码如下:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
from datetime import datetime
import logging
"""
requirements.txt
google-api-python-client==2.3.0
oauth2client==4.1.3
"""
class GTSMetrics:
def __init__(self):
self.project = "<your_gcp_project_name>"
self.source_type_mapping = {"gcsDataSource": "Google Cloud Storage", "awsS3DataSource": "Amazon S3",
"azureBlobStorageDataSource": "Azure Storage"}
self.transfer_job_names = ["transferJobs/<your_job_name>"]
self.credentials = GoogleCredentials.get_application_default()
self.service = discovery.build('storagetransfer', 'v1', credentials=self.credentials)
self.metric_values = {}
def build_run_history_metrics(self, job=None):
try:
if job:
operation_filters = {"projectId": self.project, "jobNames": [job['name']]}
request = self.service.transferOperations().list(name='transferOperations', filter=operation_filters)
while request is not None:
response = request.execute()
if 'operations' in response:
self.metric_values['total_runs'] = len(response['operations'])
metadata = response['operations'][0]['metadata']
status = metadata['status'] if 'status' in metadata else ""
start_time = metadata['startTime'] if 'startTime' in metadata else ""
end_time = metadata['endTime'] if 'endTime' in metadata else ""
start_time_object = datetime.strptime(start_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
end_time_object = datetime.strptime(end_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
gts_copy_duration = end_time_object - start_time_object
self.metric_values['latest_run_status'] = status
self.metric_values['latest_run_time'] = str(start_time_object)
self.metric_values['latest_run_errors'] = ""
self.metric_values['start_time'] = str(start_time_object)
self.metric_values['end_time'] = str(end_time_object)
self.metric_values['duration'] = gts_copy_duration.total_seconds()
if status == "FAILED":
if 'errorBreakdowns' in metadata:
errors = metadata['errorBreakdowns'][0]['errorCount']
error_code = metadata['errorBreakdowns'][0]['errorCode']
self.metric_values['latest_run_errors'] = f"{errors} - {error_code}"
elif status == "SUCCESS":
counters = metadata['counters']
data_bytes = counters['bytesCopiedToSink'] if 'bytesCopiedToSink' in counters else '0 B'
obj_from_src = str(
counters['objectsFoundFromSource']) if 'objectsFoundFromSource' in counters else 0
obj_copied_sink = str(
counters['objectsCopiedToSink']) if 'objectsCopiedToSink' in counters else 0
data_skipped_bytes = counters[
'bytesFromSourceSkippedBySync'] if 'bytesFromSourceSkippedBySync' in counters else '0 B'
data_skipped_files = counters[
'objectsFromSourceSkippedBySync'] if 'objectsFromSourceSkippedBySync' in counters else '0'
self.metric_values['data_transferred'] = data_bytes
self.metric_values['files_found_in_source'] = obj_from_src
self.metric_values['files_copied_to_sink'] = obj_copied_sink
self.metric_values['data_skipped_in_bytes'] = data_skipped_bytes
self.metric_values['data_skipped_files'] = data_skipped_files
break
# request = self.service.transferOperations().list_next(previous_request=request,
# previous_response=response)
except Exception as e:
logging.error(f"Exception in build_run_history_metrics - {str(e)}")
def build_job_metrics(self, job):
try:
transfer_spec = list(job['transferSpec'].keys())
source = ""
source_type = ""
if "gcsDataSource" in transfer_spec:
source_type = self.source_type_mapping["gcsDataSource"]
source = job['transferSpec']["gcsDataSource"]["bucketName"]
elif "awsS3DataSource" in transfer_spec:
source_type = self.source_type_mapping["awsS3DataSource"]
source = job['transferSpec']["awsS3DataSource"]["bucketName"]
elif "azureBlobStorageDataSource" in transfer_spec:
source_type = self.source_type_mapping["azureBlobStorageDataSource"]
frequency = "Once"
schedule = list(job['schedule'].keys())
if "repeatInterval" in schedule:
interval = job['schedule']['repeatInterval']
if interval == "86400s":
frequency = "Every day"
elif interval == "604800s":
frequency = "Every week"
else:
frequency = "Custom"
prefix = ""
if 'objectConditions' in transfer_spec:
obj_con = job['transferSpec']['objectConditions']
if 'includePrefixes' in obj_con:
prefix = job['transferSpec']['objectConditions']['includePrefixes'][0]
self.metric_values['job_description'] = job['description']
self.metric_values['job_name'] = job['name']
self.metric_values['source_type'] = source_type
self.metric_values['source'] = source
self.metric_values['destination'] = job['transferSpec']['gcsDataSink']['bucketName']
self.metric_values['frequency'] = frequency
self.metric_values['prefix'] = prefix
except Exception as e:
logging.error(f"Exception in build_job_metrics - {str(e)}")
def build_metrics(self):
try:
request = self.service.transferJobs().list(pageSize=None, pageToken=None, x__xgafv=None,
ilter={"projectId": self.project})
while request is not None:
response = request.execute()
for transfer_job in response['transferJobs']:
if transfer_job['name'] in self.transfer_job_names:
# fetch job details
self.build_job_metrics(job=transfer_job)
# fetch run history details for the job
self.build_run_history_metrics(job=transfer_job)
request = self.service.transferJobs().list_next(previous_request=request, previous_response=response)
logging.info(f"GTS Metrics - {str(self.metric_values)}")
except Exception as e:
logging.error(f"Exception in build_metrics - {str(e)}")
def build_gts_metrics(request):
gts_metrics = GTSMetrics()
gts_metrics.build_metrics()
我的 GCP 帐户中几乎没有 Google 云传输作业 运行ning,它将数据从 Azure 传输到 GCS 存储桶。
根据此文档 - https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/get?apix_params=%7B%22jobName%22%3A%22transferJobs%2F213858246512856794%22%2C%22projectId%22%3A%22merlincloud-gcp-preprod%22%7D “get”方法可以获取作业的详细信息,例如名称、描述、bucketName、status、includePrefixes、storageAccount 等。
这是“get”方法的示例输出。
{
"name": "transferJobs/<job_name>",
"description": "<description given while creating job>",
"projectId": "<project_id>",
"transferSpec": {
"gcsDataSink": {
"bucketName": "<destination_bucket>"
},
"objectConditions": {
"includePrefixes": [
"<prefix given while creating job>"
],
"lastModifiedSince": "2021-06-30T18:30:00Z"
},
"transferOptions": {
},
"azureBlobStorageDataSource": {
"storageAccount": "<account_name>",
"container": "<container_name>"
}
},
"schedule": {
"scheduleStartDate": {
"year": 2021,
"month": 7,
"day": 1
},
"startTimeOfDay": {
"hours": 13,
"minutes": 45
},
"repeatInterval": "86400s"
},
"status": "ENABLED",
"creationTime": "2021-07-01T06:08:19.392111916Z",
"lastModificationTime": "2021-07-01T06:13:32.460934533Z",
"latestOperationName": "transferOperations/transferJobs-<job_name>"
}
现在,如何获取 python 中特定作业的 运行 历史详细信息?
“运行 历史详细信息”是指 GTS 控制台中显示的指标(传输的数据、文件数量、状态、大小、持续时间),如下图所示。
我对传输服务不熟悉,但我对 GCP 非常熟悉。
该服务提供的唯一其他资源是 transferOperations
。
是否提供了您需要的数据?
如果不是 (!),可能是 Google 没有在控制台之外公开此功能。这种情况偶尔会发生,即使意图始终是 (public) API 首先。
您可以调查的一种方法是检查浏览器的开发人员工具 'network' 选项卡,以查看控制台正在调用什么 REST API 来满足请求。另一种方法是使用等效的 gcloud
命令并添加 --log-http
以查看底层 REST API 调用方式。
正如@DazWilkin 提到的,我能够使用 transferOperations - list API
获取每个作业的 运行 历史详细信息我编写了一个 Cloud Functions 来通过调用 API 来获取 GTS 指标。 最初它使 tansferJobs - list API 调用并获取作业列表,并且仅获取所需的作业详细信息。然后它使“transferOperations”API 调用并传递作业名称以获取 运行 历史详细信息。
代码如下:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
from datetime import datetime
import logging
"""
requirements.txt
google-api-python-client==2.3.0
oauth2client==4.1.3
"""
class GTSMetrics:
def __init__(self):
self.project = "<your_gcp_project_name>"
self.source_type_mapping = {"gcsDataSource": "Google Cloud Storage", "awsS3DataSource": "Amazon S3",
"azureBlobStorageDataSource": "Azure Storage"}
self.transfer_job_names = ["transferJobs/<your_job_name>"]
self.credentials = GoogleCredentials.get_application_default()
self.service = discovery.build('storagetransfer', 'v1', credentials=self.credentials)
self.metric_values = {}
def build_run_history_metrics(self, job=None):
try:
if job:
operation_filters = {"projectId": self.project, "jobNames": [job['name']]}
request = self.service.transferOperations().list(name='transferOperations', filter=operation_filters)
while request is not None:
response = request.execute()
if 'operations' in response:
self.metric_values['total_runs'] = len(response['operations'])
metadata = response['operations'][0]['metadata']
status = metadata['status'] if 'status' in metadata else ""
start_time = metadata['startTime'] if 'startTime' in metadata else ""
end_time = metadata['endTime'] if 'endTime' in metadata else ""
start_time_object = datetime.strptime(start_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
end_time_object = datetime.strptime(end_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
gts_copy_duration = end_time_object - start_time_object
self.metric_values['latest_run_status'] = status
self.metric_values['latest_run_time'] = str(start_time_object)
self.metric_values['latest_run_errors'] = ""
self.metric_values['start_time'] = str(start_time_object)
self.metric_values['end_time'] = str(end_time_object)
self.metric_values['duration'] = gts_copy_duration.total_seconds()
if status == "FAILED":
if 'errorBreakdowns' in metadata:
errors = metadata['errorBreakdowns'][0]['errorCount']
error_code = metadata['errorBreakdowns'][0]['errorCode']
self.metric_values['latest_run_errors'] = f"{errors} - {error_code}"
elif status == "SUCCESS":
counters = metadata['counters']
data_bytes = counters['bytesCopiedToSink'] if 'bytesCopiedToSink' in counters else '0 B'
obj_from_src = str(
counters['objectsFoundFromSource']) if 'objectsFoundFromSource' in counters else 0
obj_copied_sink = str(
counters['objectsCopiedToSink']) if 'objectsCopiedToSink' in counters else 0
data_skipped_bytes = counters[
'bytesFromSourceSkippedBySync'] if 'bytesFromSourceSkippedBySync' in counters else '0 B'
data_skipped_files = counters[
'objectsFromSourceSkippedBySync'] if 'objectsFromSourceSkippedBySync' in counters else '0'
self.metric_values['data_transferred'] = data_bytes
self.metric_values['files_found_in_source'] = obj_from_src
self.metric_values['files_copied_to_sink'] = obj_copied_sink
self.metric_values['data_skipped_in_bytes'] = data_skipped_bytes
self.metric_values['data_skipped_files'] = data_skipped_files
break
# request = self.service.transferOperations().list_next(previous_request=request,
# previous_response=response)
except Exception as e:
logging.error(f"Exception in build_run_history_metrics - {str(e)}")
def build_job_metrics(self, job):
try:
transfer_spec = list(job['transferSpec'].keys())
source = ""
source_type = ""
if "gcsDataSource" in transfer_spec:
source_type = self.source_type_mapping["gcsDataSource"]
source = job['transferSpec']["gcsDataSource"]["bucketName"]
elif "awsS3DataSource" in transfer_spec:
source_type = self.source_type_mapping["awsS3DataSource"]
source = job['transferSpec']["awsS3DataSource"]["bucketName"]
elif "azureBlobStorageDataSource" in transfer_spec:
source_type = self.source_type_mapping["azureBlobStorageDataSource"]
frequency = "Once"
schedule = list(job['schedule'].keys())
if "repeatInterval" in schedule:
interval = job['schedule']['repeatInterval']
if interval == "86400s":
frequency = "Every day"
elif interval == "604800s":
frequency = "Every week"
else:
frequency = "Custom"
prefix = ""
if 'objectConditions' in transfer_spec:
obj_con = job['transferSpec']['objectConditions']
if 'includePrefixes' in obj_con:
prefix = job['transferSpec']['objectConditions']['includePrefixes'][0]
self.metric_values['job_description'] = job['description']
self.metric_values['job_name'] = job['name']
self.metric_values['source_type'] = source_type
self.metric_values['source'] = source
self.metric_values['destination'] = job['transferSpec']['gcsDataSink']['bucketName']
self.metric_values['frequency'] = frequency
self.metric_values['prefix'] = prefix
except Exception as e:
logging.error(f"Exception in build_job_metrics - {str(e)}")
def build_metrics(self):
try:
request = self.service.transferJobs().list(pageSize=None, pageToken=None, x__xgafv=None,
ilter={"projectId": self.project})
while request is not None:
response = request.execute()
for transfer_job in response['transferJobs']:
if transfer_job['name'] in self.transfer_job_names:
# fetch job details
self.build_job_metrics(job=transfer_job)
# fetch run history details for the job
self.build_run_history_metrics(job=transfer_job)
request = self.service.transferJobs().list_next(previous_request=request, previous_response=response)
logging.info(f"GTS Metrics - {str(self.metric_values)}")
except Exception as e:
logging.error(f"Exception in build_metrics - {str(e)}")
def build_gts_metrics(request):
gts_metrics = GTSMetrics()
gts_metrics.build_metrics()