如何从云 运行 运行 请求中并行调用同一个云 运行?
how to invoke the same cloud run from a cloud run to run requests parallely?
我正在 运行使用云 运行 进行 ETL 处理。
我有 2000 个文件。只有 1200 个文件在 BIG Query 中得到预处理和加载。因为云 运行 正在超时。于是,我想到了分担。
我将 2000 个文件分成 4 个一组,每个 500 个文件并进行身份验证并使用 requests.post 调用相同的云 运行。然而,它使用相同的云实例 运行 执行一组接一组。它再次超时
我怎样才能做到 运行 并行?
截至目前,最大实例数:20。并发数:1,CPU:2,内存:8GB。
好吧,我已经研究过类似的东西。我不确定它是否对您有帮助,因为您还没有共享一个代码块。这是我要下载的 2k JSON 个文件的示例代码。
您有 2000 个文件,其中 1200 个正在 processed/loaded 进入 GBQ,然后云 运行 超时。您可以做的是:
total_files = len(file_list)//1000 #let file list be 2000, total files will be 2.
#divide the files into sets of 1000 and loop over them one by one
for file_set in range(1,(total_files+1)):
auth_and_trigger(file_list[(file_set-1)*1000:(file_set*1000)])
#for files left after 1000*i , we finally trigger it.
auth_and_trigger(file_list[(total_files)*1000:len(file_list)])
现在这就是您可以调用云 运行 的方式,每 1000 个文件具有验证和触发功能。
def auth_and_trigger(self, rest_of_files):
#your cloud run url
receiving_service_url = 'https://cloudrun-url-uc.a.run.app/download'
# Set up metadata server request
# See https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
metadata_server_token_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='
token_request_url = metadata_server_token_url + receiving_service_url
token_request_headers = {'Metadata-Flavor': 'Google'}
# Fetch the token
token_response = requests.get(token_request_url, headers=token_request_headers)
jwt = token_response.content.decode("utf-8")
# Provide the token in the request to the receiving service
receiving_service_headers = {'Authorization': f'bearer {jwt}'}
try:
threading.Thread(target=self.trigger_ingest,
args=(receiving_service_url,
{"files": rest_of_files},
receiving_service_headers
)).start()
except Exception as error:
logging.error(error)
每个线程将调用一个函数 trigger_ingest,该函数将调用云 运行。它的代码如下:
def trigger_ingest(url, json, headers=""):
service_response = requests.post(url=url,
json=json,
headers=headers
)
logging.info(service_response.content)
既然您想要并行执行,请确保没有代码在线程中重复,就像在云触发器中一样运行。
我正在 运行使用云 运行 进行 ETL 处理。
我有 2000 个文件。只有 1200 个文件在 BIG Query 中得到预处理和加载。因为云 运行 正在超时。于是,我想到了分担。
我将 2000 个文件分成 4 个一组,每个 500 个文件并进行身份验证并使用 requests.post 调用相同的云 运行。然而,它使用相同的云实例 运行 执行一组接一组。它再次超时
我怎样才能做到 运行 并行?
截至目前,最大实例数:20。并发数:1,CPU:2,内存:8GB。
好吧,我已经研究过类似的东西。我不确定它是否对您有帮助,因为您还没有共享一个代码块。这是我要下载的 2k JSON 个文件的示例代码。
您有 2000 个文件,其中 1200 个正在 processed/loaded 进入 GBQ,然后云 运行 超时。您可以做的是:
total_files = len(file_list)//1000 #let file list be 2000, total files will be 2.
#divide the files into sets of 1000 and loop over them one by one
for file_set in range(1,(total_files+1)):
auth_and_trigger(file_list[(file_set-1)*1000:(file_set*1000)])
#for files left after 1000*i , we finally trigger it.
auth_and_trigger(file_list[(total_files)*1000:len(file_list)])
现在这就是您可以调用云 运行 的方式,每 1000 个文件具有验证和触发功能。
def auth_and_trigger(self, rest_of_files):
#your cloud run url
receiving_service_url = 'https://cloudrun-url-uc.a.run.app/download'
# Set up metadata server request
# See https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
metadata_server_token_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='
token_request_url = metadata_server_token_url + receiving_service_url
token_request_headers = {'Metadata-Flavor': 'Google'}
# Fetch the token
token_response = requests.get(token_request_url, headers=token_request_headers)
jwt = token_response.content.decode("utf-8")
# Provide the token in the request to the receiving service
receiving_service_headers = {'Authorization': f'bearer {jwt}'}
try:
threading.Thread(target=self.trigger_ingest,
args=(receiving_service_url,
{"files": rest_of_files},
receiving_service_headers
)).start()
except Exception as error:
logging.error(error)
每个线程将调用一个函数 trigger_ingest,该函数将调用云 运行。它的代码如下:
def trigger_ingest(url, json, headers=""):
service_response = requests.post(url=url,
json=json,
headers=headers
)
logging.info(service_response.content)
既然您想要并行执行,请确保没有代码在线程中重复,就像在云触发器中一样运行。