C++ 中 Google Pub/Sub 的消费者示例

Consumer example for Google Pub/Sub in C++

我正在尝试 Google Pub/Sub,我需要将它集成到 C++ 代码库中。

由于在 C++ 中没有对 Google Pub/Sub 的原生支持,我通过 gRPC 使用它。因此,我通过 protoc.

生成了相应的 pubsub.grpc.pb.hpubsub.grpc.pb.ccpubsub.pb.hpubsub.pb.cc 文件

问题部分:由于缺少文档,如果有一个 C++ 示例会很有帮助。我为发布者部分找到了 example,但没有为订阅者部分找到。我试图深入研究其他语言的生成代码和示例,但出现了很多问题。订户部分有什么例子吗?或者可能有人已经有过这样的经历?

就像发出发布请求一样,您可以发出 StreamingPull 消息请求。请注意,这是一个简单的概念证明,在实践中,您可能希望使这段代码更健壮;例如创建多个流,在线程池上进行消息处理,实现某种流量控制等……

#include <iostream>
#include <memory>

#include <grpc++/grpc++.h>

#include "google/pubsub/v1/pubsub.grpc.pb.h"

auto main() -> int {
    using grpc::ClientContext;
    using grpc::ClientReaderWriter;
    using google::pubsub::v1::Subscriber;
    using google::pubsub::v1::StreamingPullRequest;
    using google::pubsub::v1::StreamingPullResponse;

    auto creds = grpc::GoogleDefaultCredentials();
    auto stub = std::make_unique<Subscriber::Stub>(
        grpc::CreateChannel("pubsub.googleapis.com", creds));

    // Open up the stream.
    ClientContext context;
    std::unique_ptr<ClientReaderWriter<
        StreamingPullRequest, StreamingPullResponse>> stream(
            stub->StreamingPull(&context));

    // Send initial message.
    StreamingPullRequest request;
    request.set_subscription(
        "projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
    request.set_stream_ack_deadline_seconds(10);
    stream->Write(request);

    // Receive messages.
    StreamingPullResponse response;
    while (stream->Read(&response)) {
      // Ack messages.
      StreamingPullRequest ack_request;
      for (const auto &message : response.received_messages()) {
        ack_request.add_ack_ids(message.ack_id());
      }
      stream->Write(ack_request);
    }
}

这是最新的 Cloud Pub/Sub API,并且是目前推荐的从服务中提取消息的方式;对于期望高吞吐量和低延迟的用户来说尤其如此。目前,没有现成的 C++ 客户端库,但在 GitHub 上有一个开放的 issue。其他语言的现有客户端库(例如 Java)已经使用此 API,因此您可以在自己的 C++ 代码中复制它们的功能。

对于更简单的 use-cases,您也可以使用旧的 Pull API, which makes many independent requests for messages. Note that, for high throughput and low latency, you should most likely be making many simultaneous asynchronous RPCs: see gRPC documentation

#include <iostream>
#include <memory>

#include <grpc++/grpc++.h>

#include "google/pubsub/v1/pubsub.grpc.pb.h"

auto main() -> int {
    using grpc::ClientContext;
    using google::pubsub::v1::Subscriber;
    using google::pubsub::v1::PullRequest;
    using google::pubsub::v1::PullResponse;

    auto creds = grpc::GoogleDefaultCredentials();
    auto stub = std::make_unique<Subscriber::Stub>(
        grpc::CreateChannel("pubsub.googleapis.com", creds));

    PullRequest request;
    request.set_subscription(
        "projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
    request.set_max_messages(50);
    request.set_return_immediately(false);

    PullResponse response;
    ClientContext ctx;

    auto status = stub->Pull(&ctx, request, &response);
    if (!status.ok()) {
        // ...
    }

    // Do something with "response".
}

作为最后的手段,您可以使用 Push 订阅,这只需要您在客户端上实施 HTTP 端点。但是,通常不建议这样做,除非您从多个订阅中展开,或者您的客户端无法发出传出请求。

有一个名为 pubsuber 的 c++ 库用于访问 google pubsub。 有关详细信息,请查看 here.