如何使用 Python 库轮询 Google Long-运行 操作?

How do I poll Google Long-Running Operations using Python Library?

我有一个来自

google.api_core.operation.Operation对象
operation = client.async_batch_annotate_files(requests=[async_request])

我想查看该操作的状态。我正在尝试使用 google.api_core.operations_v1.AbstractOperationsClient。它有方法 get_operation,应该 return 状态。

client=AbstractOperationsClient()
res=client.get_operation(operation.operation.name)

我在 运行:

时出错
ValueError: Request {'name': '*redacted*'} does not match any URL path template in available HttpRule's ['/v1/{name=operations/**}']

错误是由 this code 产生的,我相信。

我可以做的事情是使用由 Vision API ImageAnnotatorClient 实现和公开的 OperationsClient(类似于另一个线程中的 ):

ops_client = vision_v1.ImageAnnotatorClient().transport.operations_client

使用此客户端在传递完整的操作名称时有效,这给了你关于 URL 模板的初始错误:

ops_client.get_operation(vision_operation.operation.name)

查询 get_operation() 为您提供操作的预期元数据,例如用于检测操作何时完成的 done 状态:

ops_client.get_operation(vision_operation.operation.name).done
# Output
False

否则,如果您需要更简单的查询状态的方法,Vision API 操作对象的 running() 方法 returns 一个简单的 TrueFalse 取决于完成状态。您不必以这种方式实例化 OperationsClient

vision_operation.running()

完整代码段(基于 this 指南):

def main():
    input_image_uri="gs://cloud-samples-data/vision/label/wakeupcat.jpg"
    output_uri=<destination_bucket>

    ops_client = vision_v1.ImageAnnotatorClient().transport.operations_client

    # get_vision_op() not included for simplicity, returns the Operation out of async_batch_annotate_images() as shown on the linked guide
    vision_operation = get_vision_op(input_image_uri, output_uri)

    print(ops_client.get_operation(vision_operation.operation.name).done)
    print(vision_operation.running())

    print("Waiting for operation to complete...")
    response = vision_operation.result(90)

    print(vision_operation.running())
    print(ops_client.get_operation(vision_operation.operation.name).done)
    # Output sent to destination bucket
    gcs_output_uri = response.output_config.gcs_destination.uri
    print("Output written to GCS")