如何 fetch/get Google 云传输作业的 运行 历史详细信息 python?

How to fetch/get Google cloud transfer job's run history details in python?

我的 GCP 帐户中几乎没有 Google 云传输作业 运行ning,它将数据从 Azure 传输到 GCS 存储桶。

根据此文档 - https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/get?apix_params=%7B%22jobName%22%3A%22transferJobs%2F213858246512856794%22%2C%22projectId%22%3A%22merlincloud-gcp-preprod%22%7D “get”方法可以获取作业的详细信息,例如名称、描述、bucketName、status、includePrefixes、storageAccount 等。

这是“get”方法的示例输出。

{
  "name": "transferJobs/<job_name>",
  "description": "<description given while creating job>",
  "projectId": "<project_id>",
  "transferSpec": {
    "gcsDataSink": {
      "bucketName": "<destination_bucket>"
    },
    "objectConditions": {
      "includePrefixes": [
        "<prefix given while creating job>"
      ],
      "lastModifiedSince": "2021-06-30T18:30:00Z"
    },
    "transferOptions": {
      
    },
    "azureBlobStorageDataSource": {
      "storageAccount": "<account_name>",
      "container": "<container_name>"
    }
  },
  "schedule": {
    "scheduleStartDate": {
      "year": 2021,
      "month": 7,
      "day": 1
    },
    "startTimeOfDay": {
      "hours": 13,
      "minutes": 45
    },
    "repeatInterval": "86400s"
  },
  "status": "ENABLED",
  "creationTime": "2021-07-01T06:08:19.392111916Z",
  "lastModificationTime": "2021-07-01T06:13:32.460934533Z",
  "latestOperationName": "transferOperations/transferJobs-<job_name>"
}

现在,如何获取 python 中特定作业的 运行 历史详细信息?

“运行 历史详细信息”是指 GTS 控制台中显示的指标(传输的数据、文件数量、状态、大小、持续时间),如下图所示。

我对传输服务不熟悉,但我对 GCP 非常熟悉。

该服务提供的唯一其他资源是 transferOperations

是否提供了您需要的数据?

如果不是 (!),可能是 Google 没有在控制台之外公开此功能。这种情况偶尔会发生,即使意图始终是 (public) API 首先。

您可以调查的一种方法是检查浏览器的开发人员工具 'network' 选项卡,以查看控制台正在调用什么 REST API 来满足请求。另一种方法是使用等效的 gcloud 命令并添加 --log-http 以查看底层 REST API 调用方式。

正如@DazWilkin 提到的,我能够使用 transferOperations - list API

获取每个作业的 运行 历史详细信息

我编写了一个 Cloud Functions 来通过调用 API 来获取 GTS 指标。 最初它使 tansferJobs - list API 调用并获取作业列表,并且仅获取所需的作业详细信息。然后它使“transferOperations”API 调用并传递作业名称以获取 运行 历史详细信息。

代码如下:

from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
from datetime import datetime

import logging

"""
requirements.txt
google-api-python-client==2.3.0
oauth2client==4.1.3
"""


class GTSMetrics:

    def __init__(self):
        self.project = "<your_gcp_project_name>"
        self.source_type_mapping = {"gcsDataSource": "Google Cloud Storage", "awsS3DataSource": "Amazon S3",
                                    "azureBlobStorageDataSource": "Azure Storage"}
        self.transfer_job_names = ["transferJobs/<your_job_name>"]
        self.credentials = GoogleCredentials.get_application_default()
        self.service = discovery.build('storagetransfer', 'v1', credentials=self.credentials)
        self.metric_values = {}

    def build_run_history_metrics(self, job=None):
        try:
            if job:
                operation_filters = {"projectId": self.project, "jobNames": [job['name']]}

                request = self.service.transferOperations().list(name='transferOperations', filter=operation_filters)

                while request is not None:
                    response = request.execute()
                    if 'operations' in response:

                        self.metric_values['total_runs'] = len(response['operations'])

                        metadata = response['operations'][0]['metadata']

                        status = metadata['status'] if 'status' in metadata else ""
                        start_time = metadata['startTime'] if 'startTime' in metadata else ""
                        end_time = metadata['endTime'] if 'endTime' in metadata else ""

                        start_time_object = datetime.strptime(start_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
                        end_time_object = datetime.strptime(end_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
                        gts_copy_duration = end_time_object - start_time_object

                        self.metric_values['latest_run_status'] = status
                        self.metric_values['latest_run_time'] = str(start_time_object)
                        self.metric_values['latest_run_errors'] = ""
                        self.metric_values['start_time'] = str(start_time_object)
                        self.metric_values['end_time'] = str(end_time_object)
                        self.metric_values['duration'] = gts_copy_duration.total_seconds()

                        if status == "FAILED":
                            if 'errorBreakdowns' in metadata:
                                errors = metadata['errorBreakdowns'][0]['errorCount']
                                error_code = metadata['errorBreakdowns'][0]['errorCode']
                                self.metric_values['latest_run_errors'] = f"{errors} - {error_code}"
                        elif status == "SUCCESS":
                            counters = metadata['counters']
                            data_bytes = counters['bytesCopiedToSink'] if 'bytesCopiedToSink' in counters else '0 B'
                            obj_from_src = str(
                                counters['objectsFoundFromSource']) if 'objectsFoundFromSource' in counters else 0
                            obj_copied_sink = str(
                                counters['objectsCopiedToSink']) if 'objectsCopiedToSink' in counters else 0
                            data_skipped_bytes = counters[
                                'bytesFromSourceSkippedBySync'] if 'bytesFromSourceSkippedBySync' in counters else '0 B'
                            data_skipped_files = counters[
                                'objectsFromSourceSkippedBySync'] if 'objectsFromSourceSkippedBySync' in counters else '0'

                            self.metric_values['data_transferred'] = data_bytes
                            self.metric_values['files_found_in_source'] = obj_from_src
                            self.metric_values['files_copied_to_sink'] = obj_copied_sink
                            self.metric_values['data_skipped_in_bytes'] = data_skipped_bytes
                            self.metric_values['data_skipped_files'] = data_skipped_files

                    break
                    # request = self.service.transferOperations().list_next(previous_request=request,
                    #                                                       previous_response=response)

        except Exception as e:
            logging.error(f"Exception in build_run_history_metrics - {str(e)}")

    def build_job_metrics(self, job):
        try:
            transfer_spec = list(job['transferSpec'].keys())

            source = ""
            source_type = ""
            if "gcsDataSource" in transfer_spec:
                source_type = self.source_type_mapping["gcsDataSource"]
                source = job['transferSpec']["gcsDataSource"]["bucketName"]
            elif "awsS3DataSource" in transfer_spec:
                source_type = self.source_type_mapping["awsS3DataSource"]
                source = job['transferSpec']["awsS3DataSource"]["bucketName"]
            elif "azureBlobStorageDataSource" in transfer_spec:
                source_type = self.source_type_mapping["azureBlobStorageDataSource"]

            frequency = "Once"
            schedule = list(job['schedule'].keys())
            if "repeatInterval" in schedule:
                interval = job['schedule']['repeatInterval']

                if interval == "86400s":
                    frequency = "Every day"
                elif interval == "604800s":
                    frequency = "Every week"
                else:
                    frequency = "Custom"

            prefix = ""
            if 'objectConditions' in transfer_spec:
                obj_con = job['transferSpec']['objectConditions']
                if 'includePrefixes' in obj_con:
                    prefix = job['transferSpec']['objectConditions']['includePrefixes'][0]

            self.metric_values['job_description'] = job['description']
            self.metric_values['job_name'] = job['name']
            self.metric_values['source_type'] = source_type
            self.metric_values['source'] = source
            self.metric_values['destination'] = job['transferSpec']['gcsDataSink']['bucketName']
            self.metric_values['frequency'] = frequency
            self.metric_values['prefix'] = prefix
        except Exception as e:
            logging.error(f"Exception in build_job_metrics - {str(e)}")

    def build_metrics(self):
        try:
            request = self.service.transferJobs().list(pageSize=None, pageToken=None, x__xgafv=None,
                                                       ilter={"projectId": self.project})

            while request is not None:
                response = request.execute()

                for transfer_job in response['transferJobs']:
                    if transfer_job['name'] in self.transfer_job_names:
                        
                        # fetch job details
                        self.build_job_metrics(job=transfer_job)

                        # fetch run history details for the job
                        self.build_run_history_metrics(job=transfer_job)

                request = self.service.transferJobs().list_next(previous_request=request, previous_response=response)

            logging.info(f"GTS Metrics - {str(self.metric_values)}")
        except Exception as e:
            logging.error(f"Exception in build_metrics - {str(e)}")


def build_gts_metrics(request):
    gts_metrics = GTSMetrics()
    gts_metrics.build_metrics()