将 Google PubSub 与 Golang 结合使用。轮询服务的最有效(成本)方式
Using Google PubSub with Golang. The most efficient (cost) way to poll the service
我们正在从 AMQP 迁移到 Google 的 Pubsub。
The docs suggest pull 可能是我们的最佳选择,因为我们使用的是计算引擎,无法打开我们的 worker 以通过 push 服务接收。
它还说 pull 可能会产生额外的费用,具体取决于使用情况:
If polling is used, high network usage may be incurred if you are
opening connections frequently and closing them immediately.
我们在 go 中创建了一个循环运行的测试订阅者:
func main() {
jsonKey, err := ioutil.ReadFile("pubsub-key.json")
if err != nil {
log.Fatal(err)
}
conf, err := google.JWTConfigFromJSON(
jsonKey,
pubsub.ScopeCloudPlatform,
pubsub.ScopePubSub,
)
if err != nil {
log.Fatal(err)
}
ctx := cloud.NewContext("xxx", conf.Client(oauth2.NoContext))
msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{
Data: []byte("hello world"),
})
if err != nil {
log.Println(err)
}
log.Printf("Published a message with a message id: %s\n", msgIDs[0])
for {
msgs, err := pubsub.Pull(ctx, "subscription1", 1)
if err != nil {
log.Println(err)
}
if len(msgs) > 0 {
log.Printf("New message arrived: %v, len: %d\n", msgs[0].ID, len(msgs))
if err := pubsub.Ack(ctx, "subscription1", msgs[0].AckID); err != nil {
log.Fatal(err)
}
log.Println("Acknowledged message")
log.Printf("Message: %s", msgs[0].Data)
}
}
}
我的问题是这是否是正确/推荐的拉取消息方式。
我们全天每秒收到大约 100 条消息。我不确定 运行 无限循环是否会让我们破产并且找不到任何其他像样的 go 示例。
一般来说,在 Cloud Pub/Sub 中拉取订阅者的关键是确保您始终至少有一些未完成的拉取请求 max_messages 设置为适用于以下情况的值:
- 您发布消息的速度,
- 这些消息的大小,以及
- 您的订阅者可以处理消息的速率。
一旦有拉取请求 returns,您应该发出另一个。这意味着异步处理和确认拉取响应中返回给您的消息(或异步启动新的拉取请求)。如果您发现吞吐量或延迟不是您期望的,首先要做的是添加更多并发拉取请求。
如果您的发布率极低,则声明 "if polling is used, high network usage may be incurred if you are opening connections frequently and closing them immediately" 适用。想象一下,您一天只发布两到三个消息,但您不断地使用拉取请求进行轮询。这些拉取请求中的每一个都会产生发出请求的成本,但除了少数几次您实际收到消息外,您不会收到任何消息来处理,因此 "cost per message" 相当高。如果您以相当稳定的速度发布并且您的拉取请求返回非零数量的消息,那么网络使用和成本将与消息速率一致。
我们正在从 AMQP 迁移到 Google 的 Pubsub。
The docs suggest pull 可能是我们的最佳选择,因为我们使用的是计算引擎,无法打开我们的 worker 以通过 push 服务接收。
它还说 pull 可能会产生额外的费用,具体取决于使用情况:
If polling is used, high network usage may be incurred if you are opening connections frequently and closing them immediately.
我们在 go 中创建了一个循环运行的测试订阅者:
func main() {
jsonKey, err := ioutil.ReadFile("pubsub-key.json")
if err != nil {
log.Fatal(err)
}
conf, err := google.JWTConfigFromJSON(
jsonKey,
pubsub.ScopeCloudPlatform,
pubsub.ScopePubSub,
)
if err != nil {
log.Fatal(err)
}
ctx := cloud.NewContext("xxx", conf.Client(oauth2.NoContext))
msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{
Data: []byte("hello world"),
})
if err != nil {
log.Println(err)
}
log.Printf("Published a message with a message id: %s\n", msgIDs[0])
for {
msgs, err := pubsub.Pull(ctx, "subscription1", 1)
if err != nil {
log.Println(err)
}
if len(msgs) > 0 {
log.Printf("New message arrived: %v, len: %d\n", msgs[0].ID, len(msgs))
if err := pubsub.Ack(ctx, "subscription1", msgs[0].AckID); err != nil {
log.Fatal(err)
}
log.Println("Acknowledged message")
log.Printf("Message: %s", msgs[0].Data)
}
}
}
我的问题是这是否是正确/推荐的拉取消息方式。
我们全天每秒收到大约 100 条消息。我不确定 运行 无限循环是否会让我们破产并且找不到任何其他像样的 go 示例。
一般来说,在 Cloud Pub/Sub 中拉取订阅者的关键是确保您始终至少有一些未完成的拉取请求 max_messages 设置为适用于以下情况的值:
- 您发布消息的速度,
- 这些消息的大小,以及
- 您的订阅者可以处理消息的速率。
一旦有拉取请求 returns,您应该发出另一个。这意味着异步处理和确认拉取响应中返回给您的消息(或异步启动新的拉取请求)。如果您发现吞吐量或延迟不是您期望的,首先要做的是添加更多并发拉取请求。
如果您的发布率极低,则声明 "if polling is used, high network usage may be incurred if you are opening connections frequently and closing them immediately" 适用。想象一下,您一天只发布两到三个消息,但您不断地使用拉取请求进行轮询。这些拉取请求中的每一个都会产生发出请求的成本,但除了少数几次您实际收到消息外,您不会收到任何消息来处理,因此 "cost per message" 相当高。如果您以相当稳定的速度发布并且您的拉取请求返回非零数量的消息,那么网络使用和成本将与消息速率一致。