Google Cloud App Engine:是否存在对 disable/enable 应用的 API 或 SDK 调用 (python)
Google Cloud App Engine: Is there an API or SDK call to disable/enable an app (python)
我一直在通读文档,但找不到有关如何启用或禁用实际应用程序的任何说明。
我尝试 运行 一个需要 5 分钟到 1 小时处理的 python 程序。希望在处理开始之前自动启用应用程序并在完成后禁用。
如有任何提示,我们将不胜感激。
Enable/Disabling App Engine 应用程序是来自 Cloud Console 的非常简单的过程。
简单参考these steps as per the public documentation。请注意,该过程仅包括单击 Cloud Console 中“应用程序”设置页面上的按钮。一旦应用程序被禁用,如果您想返回启用应用程序,只需返回同一页面并单击同一按钮(应显示启用应用程序,而不是禁用应用程序)。
以编程方式完成此任务肯定有点棘手。尽管如此,它可以通过 App Engine Admin API and it's client libraries, more specifically by taking advantage of the apps.patch method.
来实现
文档中有一个完整的 example for disabling an application 使用带有 Python 运行时的云函数。它基于预算警报,但您可以根据您的特定需求对其进行修改,并建立您想要触发 Cloud Function 的条件。
我会创建两个不同的 Cloud Functions。一个用于启用应用程序,另一个用于禁用应用程序。代码中最相关的部分是这个(但请查看 example described 了解详细信息):
一个。禁用功能
import os
from googleapiclient import discovery
APP_NAME = os.getenv('GCP_PROJECT')
def limit_use_appengine():
if [CHANGE CONDITION AS PER YOUR NEEDS]:
print(f'No action necessary.')
return
appengine = discovery.build(
'appengine',
'v1',
cache_discovery=False
)
apps = appengine.apps()
# Get the target app's serving status
target_app = apps.get(appsId=APP_NAME).execute()
current_status = target_app['servingStatus']
# Disable target app, if necessary
if current_status == 'SERVING':
print(f'Attempting to disable app {APP_NAME}...')
body = {'servingStatus': 'USER_DISABLED'}
apps.patch(appsId=APP_NAME, updateMask='serving_status', body=body).execute()
b。启用应用程序
import os
from googleapiclient import discovery
APP_NAME = os.getenv('GCP_PROJECT')
def enable_use_appengine():
if [CHANGE CONDITION AS PER YOUR NEEDS]:
print(f'No action necessary.')
return
appengine = discovery.build(
'appengine',
'v1',
cache_discovery=False
)
apps = appengine.apps()
# Get the target app's serving status
target_app = apps.get(appsId=APP_NAME).execute()
current_status = target_app['servingStatus']
# Disable target app, if necessary
if current_status == 'USER_DISABLED':
print(f'Attempting to enable app {APP_NAME}...')
body = {'servingStatus': 'SERVING'}
apps.patch(appsId=APP_NAME, updateMask='serving_status', body=body).execute()
我还没有测试代码本身。但我相信它们会奏效。了解禁用 App Engine 应用程序将停止与应用程序服务请求相关的费用非常重要,但是如果您使用 Datastore 或 Cloud Storage 或任何其他产品来存储您的应用程序生成的一些数据,这些费用仍会累积.
从 python 3.8 开始,由于删除了 GCP_PROJECT 环境变量,此代码不再有效,而且 Google 的代码也不允许测试云功能,这我也加了
import base64
import json
import os
from googleapiclient import discovery
import google.auth
_, project_id = google.auth.default()
APP_NAME = project_id
def limit_use_appengine(data, context):
if 'data' in data.keys():
pubsub_data = base64.b64decode(data['data']).decode('utf-8')
pubsub_json = json.loads(pubsub_data)
else:
pubsub_json = data
cost_amount = pubsub_json['costAmount']
budget_amount = pubsub_json['budgetAmount']
if cost_amount <= budget_amount:
print(f'No action necessary. (Current cost: {cost_amount})')
return
appengine = discovery.build(
'appengine',
'v1',
cache_discovery=False
)
apps = appengine.apps()
# Get the target app's serving status
target_app = apps.get(appsId=APP_NAME).execute()
current_status = target_app['servingStatus']
# Disable target app, if necessary
if current_status == 'SERVING':
print(f'Attempting to disable app {APP_NAME}...')
body = {'servingStatus': 'USER_DISABLED'}
apps.patch(appsId=APP_NAME, updateMask='serving_status', body=body).execute()
我一直在通读文档,但找不到有关如何启用或禁用实际应用程序的任何说明。
我尝试 运行 一个需要 5 分钟到 1 小时处理的 python 程序。希望在处理开始之前自动启用应用程序并在完成后禁用。
如有任何提示,我们将不胜感激。
Enable/Disabling App Engine 应用程序是来自 Cloud Console 的非常简单的过程。
简单参考these steps as per the public documentation。请注意,该过程仅包括单击 Cloud Console 中“应用程序”设置页面上的按钮。一旦应用程序被禁用,如果您想返回启用应用程序,只需返回同一页面并单击同一按钮(应显示启用应用程序,而不是禁用应用程序)。
以编程方式完成此任务肯定有点棘手。尽管如此,它可以通过 App Engine Admin API and it's client libraries, more specifically by taking advantage of the apps.patch method.
来实现文档中有一个完整的 example for disabling an application 使用带有 Python 运行时的云函数。它基于预算警报,但您可以根据您的特定需求对其进行修改,并建立您想要触发 Cloud Function 的条件。
我会创建两个不同的 Cloud Functions。一个用于启用应用程序,另一个用于禁用应用程序。代码中最相关的部分是这个(但请查看 example described 了解详细信息):
一个。禁用功能
import os
from googleapiclient import discovery
APP_NAME = os.getenv('GCP_PROJECT')
def limit_use_appengine():
if [CHANGE CONDITION AS PER YOUR NEEDS]:
print(f'No action necessary.')
return
appengine = discovery.build(
'appengine',
'v1',
cache_discovery=False
)
apps = appengine.apps()
# Get the target app's serving status
target_app = apps.get(appsId=APP_NAME).execute()
current_status = target_app['servingStatus']
# Disable target app, if necessary
if current_status == 'SERVING':
print(f'Attempting to disable app {APP_NAME}...')
body = {'servingStatus': 'USER_DISABLED'}
apps.patch(appsId=APP_NAME, updateMask='serving_status', body=body).execute()
b。启用应用程序
import os
from googleapiclient import discovery
APP_NAME = os.getenv('GCP_PROJECT')
def enable_use_appengine():
if [CHANGE CONDITION AS PER YOUR NEEDS]:
print(f'No action necessary.')
return
appengine = discovery.build(
'appengine',
'v1',
cache_discovery=False
)
apps = appengine.apps()
# Get the target app's serving status
target_app = apps.get(appsId=APP_NAME).execute()
current_status = target_app['servingStatus']
# Disable target app, if necessary
if current_status == 'USER_DISABLED':
print(f'Attempting to enable app {APP_NAME}...')
body = {'servingStatus': 'SERVING'}
apps.patch(appsId=APP_NAME, updateMask='serving_status', body=body).execute()
我还没有测试代码本身。但我相信它们会奏效。了解禁用 App Engine 应用程序将停止与应用程序服务请求相关的费用非常重要,但是如果您使用 Datastore 或 Cloud Storage 或任何其他产品来存储您的应用程序生成的一些数据,这些费用仍会累积.
从 python 3.8 开始,由于删除了 GCP_PROJECT 环境变量,此代码不再有效,而且 Google 的代码也不允许测试云功能,这我也加了
import base64
import json
import os
from googleapiclient import discovery
import google.auth
_, project_id = google.auth.default()
APP_NAME = project_id
def limit_use_appengine(data, context):
if 'data' in data.keys():
pubsub_data = base64.b64decode(data['data']).decode('utf-8')
pubsub_json = json.loads(pubsub_data)
else:
pubsub_json = data
cost_amount = pubsub_json['costAmount']
budget_amount = pubsub_json['budgetAmount']
if cost_amount <= budget_amount:
print(f'No action necessary. (Current cost: {cost_amount})')
return
appengine = discovery.build(
'appengine',
'v1',
cache_discovery=False
)
apps = appengine.apps()
# Get the target app's serving status
target_app = apps.get(appsId=APP_NAME).execute()
current_status = target_app['servingStatus']
# Disable target app, if necessary
if current_status == 'SERVING':
print(f'Attempting to disable app {APP_NAME}...')
body = {'servingStatus': 'USER_DISABLED'}
apps.patch(appsId=APP_NAME, updateMask='serving_status', body=body).execute()