在 Google 云上使用 celery worker 部署 Flask 应用程序
Deploy Flask app with celery worker on Google Cloud
我有一个非常简单的 Flask 应用示例,它使用 celery worker 异步处理任务:
app.py
app.config['CELERY_BROKER_URL'] = os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['CELERY_RESULT_BACKEND']= os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['SQLALCHEMY_DATABASE_URI'] = conn_str
celery = make_celery(app)
db.init_app(app)
@app.route('/')
def index():
return "Working"
@app.route('/test')
def test():
task = reverse.delay("hello")
return task.id
@celery.task(name='app.reverse')
def reverse(string):
return string[::-1]
if __name__ == "__main__":
app.run()
为了运行它在本地,我运行celery -A app.celery worker --loglevel=INFO
在一个终端中,python app.py 在另一个终端中。
我想知道如何在 Google 云上部署此应用程序?我不想使用任务队列,因为它只与 Python 兼容 2. 是否有很好的文档可用于执行此类操作?谢谢
App Engine 任务队列是 Google Cloud Tasks 的早期版本,它完全支持 App Engine Flex/STD 和 Python 3.x 运行时。
您需要创建一个 Cloud Task Queue 和一个 App Engine 服务来处理这些任务
Gcloud 命令 create 一个队列
gcloud tasks queues create [QUEUE_ID]
任务处理器代码
from flask import Flask, request
app = Flask(__name__)
@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
"""Log the request payload."""
payload = request.get_data(as_text=True) or '(empty payload)'
print('Received task with payload: {}'.format(payload))
return 'Printed task payload: {}'.format(payload)
推送任务的代码
"""Create a task for a given queue with an arbitrary payload."""
from google.cloud import tasks_v2
client = tasks_v2.CloudTasksClient()
# replace with your values.
# project = 'my-project-id'
# queue = 'my-appengine-queue'
# location = 'us-central1'
# payload = 'hello'
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': tasks_v2.HttpMethod.POST,
'relative_uri': '/example_task_handler'
}
}
if payload is not None:
# The API expects a payload of type bytes.
converted_payload = payload.encode()
# Add the payload to the request.
task['app_engine_http_request']['body'] = converted_payload
if in_seconds is not None:
timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
# Use the client to build and send the task.
response = client.create_task(parent=parent, task=task)
print('Created task {}'.format(response.name))
return response
requirements.txt
Flask==1.1.2
gunicorn==20.0.4
google-cloud-tasks==2.0.0
您可以在 GCP 中查看此完整示例 Python examples Github page
我有一个非常简单的 Flask 应用示例,它使用 celery worker 异步处理任务:
app.py
app.config['CELERY_BROKER_URL'] = os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['CELERY_RESULT_BACKEND']= os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['SQLALCHEMY_DATABASE_URI'] = conn_str
celery = make_celery(app)
db.init_app(app)
@app.route('/')
def index():
return "Working"
@app.route('/test')
def test():
task = reverse.delay("hello")
return task.id
@celery.task(name='app.reverse')
def reverse(string):
return string[::-1]
if __name__ == "__main__":
app.run()
为了运行它在本地,我运行celery -A app.celery worker --loglevel=INFO 在一个终端中,python app.py 在另一个终端中。
我想知道如何在 Google 云上部署此应用程序?我不想使用任务队列,因为它只与 Python 兼容 2. 是否有很好的文档可用于执行此类操作?谢谢
App Engine 任务队列是 Google Cloud Tasks 的早期版本,它完全支持 App Engine Flex/STD 和 Python 3.x 运行时。
您需要创建一个 Cloud Task Queue 和一个 App Engine 服务来处理这些任务
Gcloud 命令 create 一个队列
gcloud tasks queues create [QUEUE_ID]
任务处理器代码
from flask import Flask, request
app = Flask(__name__)
@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
"""Log the request payload."""
payload = request.get_data(as_text=True) or '(empty payload)'
print('Received task with payload: {}'.format(payload))
return 'Printed task payload: {}'.format(payload)
推送任务的代码
"""Create a task for a given queue with an arbitrary payload."""
from google.cloud import tasks_v2
client = tasks_v2.CloudTasksClient()
# replace with your values.
# project = 'my-project-id'
# queue = 'my-appengine-queue'
# location = 'us-central1'
# payload = 'hello'
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': tasks_v2.HttpMethod.POST,
'relative_uri': '/example_task_handler'
}
}
if payload is not None:
# The API expects a payload of type bytes.
converted_payload = payload.encode()
# Add the payload to the request.
task['app_engine_http_request']['body'] = converted_payload
if in_seconds is not None:
timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
# Use the client to build and send the task.
response = client.create_task(parent=parent, task=task)
print('Created task {}'.format(response.name))
return response
requirements.txt
Flask==1.1.2
gunicorn==20.0.4
google-cloud-tasks==2.0.0
您可以在 GCP 中查看此完整示例 Python examples Github page