在 Google 云函数中使用 PubSub
Consume PubSub in Google cloud function
以官方为准document
我尝试使用“PubSub Pull Subscription”触发器创建云函数
import base64
def hello_pubsub(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
print("This Function was triggered by messageId {} published at {}".format(context.event_id, context.timestamp))
if 'data' in event:
name = base64.b64decode(event['data']).decode('utf-8')
print('"{}" received!'.format(name))
if 'attributes' in event:
print(event['attributes'])
if '@type' in event:
print(event['@type'])
然后我发现一个说“cloud function will send ACK on its invocation”,这和官方文档是一致的。
但是,当云函数处理完 PubSub 消息后,“Unacked message count”并没有减少(如上图所示)
因此,我在本地
上尝试 google-cloud-pubsub
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
response = subscriber.pull(subscription_path, max_messages=5)
for msg in response.received_messages:
print("Received message:", msg.message.data)
ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)
这样,消息数减少成功。
我的问题是:
- 我的云函数脚本中是否遗漏了什么?
- 如何在我的云函数中实际“使用”PubSub 消息?
欢迎任何建议,谢谢。
有了 PubSub,您就有了发布者,可以将消息发布到主题中。该消息在每个现有订阅(在主题上创建)中重复。最后,订阅者可以收听订阅。
所以,在这里,您有 1 个主题和 1 个请求订阅。您还有一个部署在 topic 上的 Cloud Functions(在 gcloud cli 中,参数 --trigger-topic=myTopic
)。 关于主题,而不是订阅。
返回订阅页面,您应该会看到 2 个订阅:您的请求订阅和对陌生端点的推送订阅
因此,您的消息发布在 2 个订阅中。如果您查看您的 Pull 订阅,除了您在本地的代码外,没有任何内容会消耗其中的消息。云函数中的日志应该显示正确的消息处理。
是不是更清楚了?
编辑
准确地说是你的情况:
- 您的 Cloud Functions 无法确认请求订阅中的消息,因为它没有连接到它
- 您的 Cloud Functions 处理并确认在其自己的订阅中发布的消息。
以官方为准document
我尝试使用“PubSub Pull Subscription”触发器创建云函数
import base64
def hello_pubsub(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
print("This Function was triggered by messageId {} published at {}".format(context.event_id, context.timestamp))
if 'data' in event:
name = base64.b64decode(event['data']).decode('utf-8')
print('"{}" received!'.format(name))
if 'attributes' in event:
print(event['attributes'])
if '@type' in event:
print(event['@type'])
然后我发现一个
但是,当云函数处理完 PubSub 消息后,“Unacked message count”并没有减少(如上图所示)
因此,我在本地
上尝试 google-cloud-pubsubsubscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
response = subscriber.pull(subscription_path, max_messages=5)
for msg in response.received_messages:
print("Received message:", msg.message.data)
ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)
这样,消息数减少成功。
我的问题是:
- 我的云函数脚本中是否遗漏了什么?
- 如何在我的云函数中实际“使用”PubSub 消息?
欢迎任何建议,谢谢。
有了 PubSub,您就有了发布者,可以将消息发布到主题中。该消息在每个现有订阅(在主题上创建)中重复。最后,订阅者可以收听订阅。
所以,在这里,您有 1 个主题和 1 个请求订阅。您还有一个部署在 topic 上的 Cloud Functions(在 gcloud cli 中,参数 --trigger-topic=myTopic
)。 关于主题,而不是订阅。
返回订阅页面,您应该会看到 2 个订阅:您的请求订阅和对陌生端点的推送订阅
因此,您的消息发布在 2 个订阅中。如果您查看您的 Pull 订阅,除了您在本地的代码外,没有任何内容会消耗其中的消息。云函数中的日志应该显示正确的消息处理。
是不是更清楚了?
编辑
准确地说是你的情况:
- 您的 Cloud Functions 无法确认请求订阅中的消息,因为它没有连接到它
- 您的 Cloud Functions 处理并确认在其自己的订阅中发布的消息。