C++ 中 Google Pub/Sub 的消费者示例
Consumer example for Google Pub/Sub in C++
我正在尝试 Google Pub/Sub,我需要将它集成到 C++ 代码库中。
由于在 C++ 中没有对 Google Pub/Sub
的原生支持,我通过 gRPC
使用它。因此,我通过 protoc
.
生成了相应的 pubsub.grpc.pb.h
、pubsub.grpc.pb.cc
、pubsub.pb.h
和 pubsub.pb.cc
文件
问题部分:由于缺少文档,如果有一个 C++ 示例会很有帮助。我为发布者部分找到了 example,但没有为订阅者部分找到。我试图深入研究其他语言的生成代码和示例,但出现了很多问题。订户部分有什么例子吗?或者可能有人已经有过这样的经历?
就像发出发布请求一样,您可以发出 StreamingPull 消息请求。请注意,这是一个简单的概念证明,在实践中,您可能希望使这段代码更健壮;例如创建多个流,在线程池上进行消息处理,实现某种流量控制等……
#include <iostream>
#include <memory>
#include <grpc++/grpc++.h>
#include "google/pubsub/v1/pubsub.grpc.pb.h"
auto main() -> int {
using grpc::ClientContext;
using grpc::ClientReaderWriter;
using google::pubsub::v1::Subscriber;
using google::pubsub::v1::StreamingPullRequest;
using google::pubsub::v1::StreamingPullResponse;
auto creds = grpc::GoogleDefaultCredentials();
auto stub = std::make_unique<Subscriber::Stub>(
grpc::CreateChannel("pubsub.googleapis.com", creds));
// Open up the stream.
ClientContext context;
std::unique_ptr<ClientReaderWriter<
StreamingPullRequest, StreamingPullResponse>> stream(
stub->StreamingPull(&context));
// Send initial message.
StreamingPullRequest request;
request.set_subscription(
"projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
request.set_stream_ack_deadline_seconds(10);
stream->Write(request);
// Receive messages.
StreamingPullResponse response;
while (stream->Read(&response)) {
// Ack messages.
StreamingPullRequest ack_request;
for (const auto &message : response.received_messages()) {
ack_request.add_ack_ids(message.ack_id());
}
stream->Write(ack_request);
}
}
这是最新的 Cloud Pub/Sub API,并且是目前推荐的从服务中提取消息的方式;对于期望高吞吐量和低延迟的用户来说尤其如此。目前,没有现成的 C++ 客户端库,但在 GitHub 上有一个开放的 issue。其他语言的现有客户端库(例如 Java)已经使用此 API,因此您可以在自己的 C++ 代码中复制它们的功能。
对于更简单的 use-cases,您也可以使用旧的 Pull API, which makes many independent requests for messages. Note that, for high throughput and low latency, you should most likely be making many simultaneous asynchronous RPCs: see gRPC documentation。
#include <iostream>
#include <memory>
#include <grpc++/grpc++.h>
#include "google/pubsub/v1/pubsub.grpc.pb.h"
auto main() -> int {
using grpc::ClientContext;
using google::pubsub::v1::Subscriber;
using google::pubsub::v1::PullRequest;
using google::pubsub::v1::PullResponse;
auto creds = grpc::GoogleDefaultCredentials();
auto stub = std::make_unique<Subscriber::Stub>(
grpc::CreateChannel("pubsub.googleapis.com", creds));
PullRequest request;
request.set_subscription(
"projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
request.set_max_messages(50);
request.set_return_immediately(false);
PullResponse response;
ClientContext ctx;
auto status = stub->Pull(&ctx, request, &response);
if (!status.ok()) {
// ...
}
// Do something with "response".
}
作为最后的手段,您可以使用 Push 订阅,这只需要您在客户端上实施 HTTP 端点。但是,通常不建议这样做,除非您从多个订阅中展开,或者您的客户端无法发出传出请求。
有一个名为 pubsuber
的 c++ 库用于访问 google pubsub。
有关详细信息,请查看 here.
我正在尝试 Google Pub/Sub,我需要将它集成到 C++ 代码库中。
由于在 C++ 中没有对 Google Pub/Sub
的原生支持,我通过 gRPC
使用它。因此,我通过 protoc
.
pubsub.grpc.pb.h
、pubsub.grpc.pb.cc
、pubsub.pb.h
和 pubsub.pb.cc
文件
问题部分:由于缺少文档,如果有一个 C++ 示例会很有帮助。我为发布者部分找到了 example,但没有为订阅者部分找到。我试图深入研究其他语言的生成代码和示例,但出现了很多问题。订户部分有什么例子吗?或者可能有人已经有过这样的经历?
就像发出发布请求一样,您可以发出 StreamingPull 消息请求。请注意,这是一个简单的概念证明,在实践中,您可能希望使这段代码更健壮;例如创建多个流,在线程池上进行消息处理,实现某种流量控制等……
#include <iostream>
#include <memory>
#include <grpc++/grpc++.h>
#include "google/pubsub/v1/pubsub.grpc.pb.h"
auto main() -> int {
using grpc::ClientContext;
using grpc::ClientReaderWriter;
using google::pubsub::v1::Subscriber;
using google::pubsub::v1::StreamingPullRequest;
using google::pubsub::v1::StreamingPullResponse;
auto creds = grpc::GoogleDefaultCredentials();
auto stub = std::make_unique<Subscriber::Stub>(
grpc::CreateChannel("pubsub.googleapis.com", creds));
// Open up the stream.
ClientContext context;
std::unique_ptr<ClientReaderWriter<
StreamingPullRequest, StreamingPullResponse>> stream(
stub->StreamingPull(&context));
// Send initial message.
StreamingPullRequest request;
request.set_subscription(
"projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
request.set_stream_ack_deadline_seconds(10);
stream->Write(request);
// Receive messages.
StreamingPullResponse response;
while (stream->Read(&response)) {
// Ack messages.
StreamingPullRequest ack_request;
for (const auto &message : response.received_messages()) {
ack_request.add_ack_ids(message.ack_id());
}
stream->Write(ack_request);
}
}
这是最新的 Cloud Pub/Sub API,并且是目前推荐的从服务中提取消息的方式;对于期望高吞吐量和低延迟的用户来说尤其如此。目前,没有现成的 C++ 客户端库,但在 GitHub 上有一个开放的 issue。其他语言的现有客户端库(例如 Java)已经使用此 API,因此您可以在自己的 C++ 代码中复制它们的功能。
对于更简单的 use-cases,您也可以使用旧的 Pull API, which makes many independent requests for messages. Note that, for high throughput and low latency, you should most likely be making many simultaneous asynchronous RPCs: see gRPC documentation。
#include <iostream>
#include <memory>
#include <grpc++/grpc++.h>
#include "google/pubsub/v1/pubsub.grpc.pb.h"
auto main() -> int {
using grpc::ClientContext;
using google::pubsub::v1::Subscriber;
using google::pubsub::v1::PullRequest;
using google::pubsub::v1::PullResponse;
auto creds = grpc::GoogleDefaultCredentials();
auto stub = std::make_unique<Subscriber::Stub>(
grpc::CreateChannel("pubsub.googleapis.com", creds));
PullRequest request;
request.set_subscription(
"projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
request.set_max_messages(50);
request.set_return_immediately(false);
PullResponse response;
ClientContext ctx;
auto status = stub->Pull(&ctx, request, &response);
if (!status.ok()) {
// ...
}
// Do something with "response".
}
作为最后的手段,您可以使用 Push 订阅,这只需要您在客户端上实施 HTTP 端点。但是,通常不建议这样做,除非您从多个订阅中展开,或者您的客户端无法发出传出请求。
有一个名为 pubsuber
的 c++ 库用于访问 google pubsub。
有关详细信息,请查看 here.