如何从云 运行 运行 请求中并行调用同一个云 运行?

how to invoke the same cloud run from a cloud run to run requests parallely?

我正在 运行使用云 运行 进行 ETL 处理。

我有 2000 个文件。只有 1200 个文件在 BIG Query 中得到预处理和加载。因为云 运行 正在超时。于是,我想到了分担。

我将 2000 个文件分成 4 个一组,每个 500 个文件并进行身份验证并使用 requests.post 调用相同的云 运行。然而,它使用相同的云实例 运行 执行一组接一组。它再次超时

我怎样才能做到 运行 并行?

截至目前,最大实例数:20。并发数:1,CPU:2,内存:8GB。

好吧,我已经研究过类似的东西。我不确定它是否对您有帮助,因为您还没有共享一个代码块。这是我要下载的 2k JSON 个文件的示例代码。

您有 2000 个文件,其中 1200 个正在 processed/loaded 进入 GBQ,然后云 运行 超时。您可以做的是:

    total_files = len(file_list)//1000   #let file list be 2000, total files will be 2.

    #divide the files into sets of 1000 and loop over them one by one
    
    for file_set in range(1,(total_files+1)):
        auth_and_trigger(file_list[(file_set-1)*1000:(file_set*1000)])

    #for files left after 1000*i , we finally trigger it.
    auth_and_trigger(file_list[(total_files)*1000:len(file_list)])

现在这就是您可以调用云 运行 的方式,每 1000 个文件具有验证和触发功能。

    def auth_and_trigger(self, rest_of_files):
    #your cloud run url
    receiving_service_url = 'https://cloudrun-url-uc.a.run.app/download'

    # Set up metadata server request
    # See https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
    metadata_server_token_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='

    token_request_url = metadata_server_token_url + receiving_service_url
    token_request_headers = {'Metadata-Flavor': 'Google'}

    # Fetch the token
    token_response = requests.get(token_request_url, headers=token_request_headers)
    jwt = token_response.content.decode("utf-8")

    # Provide the token in the request to the receiving service
    receiving_service_headers = {'Authorization': f'bearer {jwt}'}

    try:
        threading.Thread(target=self.trigger_ingest,
                         args=(receiving_service_url,
                               {"files": rest_of_files},
                               receiving_service_headers
                               )).start()
    except Exception as error:
        logging.error(error)

每个线程将调用一个函数 trigger_ingest,该函数将调用云 运行。它的代码如下:

    def trigger_ingest(url, json, headers=""):
    service_response = requests.post(url=url,
                                     json=json,
                                     headers=headers
                                     )
    logging.info(service_response.content)

既然您想要并行执行,请确保没有代码在线程中重复,就像在云触发器中一样运行。