如何同时从 grpc 流读取和写入

How can I read and write from a grpc stream simultaneously

我现在正在实现Raft算法,想用gRPC流来实现。我的主要想法是为每个节点创建 3 个流到每个其他对等点,一个流将传输一种类型的 RPC,有 AppendEntriesRequestVoteInstallSnapshot。我在 route_guide 的有限帮助下编写了一些代码,因为在其双向流演示 RouteChat 中,客户端在开始读取之前发送了所有数据。

首先,我想随时写入一个流,所以写了下面的代码

void RaftMessagesStreamClientSync::AsyncRequestVote(const RequestVoteRequest& request){
    std::string peer_name = this->peer_name;
    debug("GRPC: Send RequestVoteRequest from %s to %s\n", request.name().c_str(), peer_name.c_str());
    request_vote_stream->Write(request);
}

与此同时,我希望线程继续从流中读取,如下面的代码,在构造 RaftMessagesStreamClientSync 后立即调用。

void RaftMessagesStreamClientSync::handle_response(){
    // strongThis is a must 
    auto strongThis = shared_from_this();
    t1 = new std::thread([strongThis](){
        RequestVoteResponse response;
        while (strongThis->request_vote_stream->Read(&response)) {
            debug("GRPC: Recv RequestVoteResponse from %s, me %s\n", response.name().c_str(), strongThis->raft_node->name.c_str());
            ...
        }
    });
    ...

为了初始化 3 个流,我必须这样写构造函数,我在这里使用 3 ClientContext 因为 document says one ClientContext for one RPC

struct RaftMessagesStreamClientSync : std::enable_shared_from_this<RaftMessagesStreamClientSync>{
    typedef grpc::ClientReaderWriter<RequestVoteRequest, RequestVoteResponse> CR;
    typedef grpc::ClientReaderWriter<AppendEntriesRequest, AppendEntriesResponse> CA;
    typedef grpc::ClientReaderWriter<InstallSnapshotRequest, InstallSnapshotResponse> CI;

    std::unique_ptr<CR> request_vote_stream;
    std::unique_ptr<CA> append_entries_stream;
    std::unique_ptr<CI> install_snapshot_stream;
    ClientContext context_r;
    ClientContext context_a;
    ClientContext context_i;
    std::thread * t1 = nullptr;
    std::thread * t2 = nullptr;
    std::thread * t3 = nullptr;
    ...
}
RaftMessagesStreamClientSync::RaftMessagesStreamClientSync(const char * addr, struct RaftNode * _raft_node) : raft_node(_raft_node), peer_name(addr) {
    std::shared_ptr<Channel> channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
    stub = raft_messages::RaftStreamMessages::NewStub(channel);
    // 1
    request_vote_stream = stub->RequestVote(&context_r);
    // 2
    append_entries_stream = stub->AppendEntries(&context_a);
    // 3
    install_snapshot_stream = stub->InstallSnapshot(&context_i);    
}
~RaftMessagesStreamClientSync() {
    raft_node = nullptr;
    t1->join();
    t2->join();
    t3->join();
    delete t1;
    delete t2;
    delete t3;
}

然后我实现服务器端

Status RaftMessagesStreamServiceImpl::RequestVote(ServerContext* context, ::grpc::ServerReaderWriter< ::raft_messages::RequestVoteResponse, RequestVoteRequest>* stream){
    RequestVoteResponse response;
    RequestVoteRequest request;
    while (stream->Read(&request)) {
        ...
    }

    return Status::OK;
}

然后出现2​​个问题:

  1. 当我测试 3 个节点时,实际上为每个节点创建 2 RaftMessagesStreamServiceImpl,从 1 到 3 的语句执行时间很长。
  2. 没有从服务器端收到 RPC。 使用 Bidi Aysnc Server 时也有类似的问题,但是我不知道这个 post 对我有什么帮助。

更新

经过一番调试,我发现request_vote_stream->Write(request) returns 0,根据document, means the stream is closed。但是为什么关闭了?

经过一番调试,我发现这两个问题都是因为我在创建服务端之前先创建客户端的问题。

因为我原来使用的是一元RPC调用,所以客户端之前的一次调用只会导致gRPC错误码14。程序继续运行是因为在创建服务器后发送的每个调用都可以正确处理。

但是,当涉及到流式调用时,stub->RequestVote(&context_r) 最终会调用一个阻塞函数 ClientReaderWriter::ClientReaderWriter,该函数将尝试连接到现在未创建的服务器。

/// Block to create a stream and write the initial metadata and \a request
/// out. Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface* channel,
                 const ::grpc::internal::RpcMethod& method,
                 ClientContext* context)
  : context_(context),
    cq_(grpc_completion_queue_attributes{
        GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
        GRPC_CQ_DEFAULT_POLLING}),  // Pluckable cq
    call_(channel->CreateCall(method, context, &cq_)) {
if (!context_->initial_metadata_corked_) {
  ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
      ops;
  ops.SendInitialMetadata(context->send_initial_metadata_,
                          context->initial_metadata_flags());
  call_.PerformOps(&ops);
  cq_.Pluck(&ops);
}
}

因此,连接尚未建立。