双向 gRPC 流有时会在停止和启动后停止处理响应

Bidirectional gRPC stream sometimes stops processing responses after stopping and starting

简而言之

我们有一个移动应用程序,它通过各种双向流将相当大量的数据流式传输到服务器和从服务器流式传输。有时需要关闭流(例如,当应用程序处于后台时)。然后根据需要重新打开它们。有时发生这种情况时,会出现问题:

更多详情

我们的代码在下面得到了极大的简化,但这应该能提供足够的细节来说明我们在做什么。双向流由 Switch class:

管理
class Switch {
    /** The protocall over which we send and receive data */
    var protocall: GRPCProtoCall?

    /** The writer object that writes data to the protocall. */
    var writer: GRXBufferedPipe?

    /** A static GRPCProtoService as per the .proto */
    static let service = APPDataService(host: Settings.grpcHost)

    /** A response handler. APPData is the datatype defined by the .proto. */
    func rpcResponse(done: Bool, response: APPData?, error: Error?) {
        NSLog("Response received")
        // Handle response...
    }

    func start() {
        // Create a (new) instance of the writer
        // (A writer cannot be used on multiple protocalls)
        self.writer = GRXBufferedPipe()

        // Setup the protocall
        self.protocall = Switch.service.rpcToStream(withRequestWriter: self.writer!, eventHandler: self.rpcRespose(done:response:error:))

        // Start the stream
        self.protocall.start()
    }

    func stop() {
        // Stop the writer if it is started.
        if self.writer.state == .started || self.writer.state == .paused {
            self.writer.finishWithError(nil)
        }

        // Stop the proto call if it is started
        if self.protocall?.state == .started || self.protocall?.state == .paused {
            protocall?.cancel()
        }
        self.protocall = nil
    }

    private var needsRestart: Bool {
        if let protocall = self.protocall {
            if protocall.state == .notStarted || protocall.state == .finished {
                // protocall exists, but isn't running.
                return true
            } else if writer.state == .notStarted || writer.state == .finished {
                // writer isn't running
                return true
            } else {
                // protocall and writer are running
                return false
            }
        } else {
            // protocall doesn't exist.
            return true
        }
    }

    func restartIfNeeded() {
        guard self.needsRestart else { return }
        self.stop()
        self.start()
    }

    func write(data: APPData) {
        self.writer.writeValue(data)
    }
}

就像我说的,大大简化了,但它显示了我们如何启动、停止和重新启动流,以及我们如何检查流是否健康。

当应用程序在后台运行时,我们调用 stop()。当它被置于前台并且我们再次需要流时,我们调用 start()。我们定期调用 restartIfNeeded(),例如。当使用流的屏幕出现时。

正如我上面提到的,偶尔会发生的情况是,当服务器将数据写入流时,我们的响应处理程序 (rpcResponse) 会停止调用。流似乎是健康的(服务器接收到我们写入的数据,并且 protocall.state 既不是 .notStarted 也不是 .finished)。但是连响应处理程序第一行的日志都没有执行。

第一个问题:我们是否正确管理流,或者我们停止和重新启动流的方式是否容易出错?如果是这样,做这样的事情的正确方法是什么?

第二个问题:我们如何调试这个?我们能想到的所有可以查询状态的信息都告诉我们流已启动并且 运行,但感觉 objc gRPC 库对我们隐藏了很多机制。有没有办法查看来自服务器的响应是否可以 到达我们,但未能触发我们的响应处理程序?

第三个问题:根据上面的代码,我们使用了库提供的GRXBufferedPipe。它的文档建议不要在生产中使用它,因为它没有后推机制。据我们了解,编写器仅用于以同步的、一次一个的方式将数据提供给 gRPC 核心,并且由于服务器可以很好地接收来自我们的数据,我们认为这不是问题。我们错了吗?作者是否也参与将从服务器接收到的数据提供给我们的响应处理程序? IE。如果写入器因过载而损坏,是否会表现为从流中读取数据而不是写入数据时出现问题?

更新:问这个问题一年多后,我们终于在我们的服务器端代码中发现了一个导致客户端出现这种行为的死锁错误。流似乎挂起,因为客户端发送的通信没有被服务器处理,反之亦然,但流实际上是活跃的。接受的答案为如何管理这些双向流提供了很好的建议,我认为这仍然很有价值(它帮助了我们很多!)。但问题实际上是由于编程错误。

此外,对于 运行 遇到此类问题的任何人,可能值得调查一下您是否遇到 this known issue where a channel gets silently dropped when iOS changes its network. This readme 提供了使用 Apple 的 CFStream API 而不是 TCP 套接字的说明作为该问题的可能解决方案。

First question: Are we managing the streams correctly, or is our way of stopping and restarting streams prone to errors? If so, what is the correct way of doing something like this?

根据我查看您的代码可以看出,start() 函数似乎是正确的。在stop()函数中,不需要调用self.protocallcancel();通话将以之前的 self.writer.finishWithError(nil).

结束

needsrestart() 有点乱。首先,您不应该 poll/set 自己 protocall 的状态。这种状态是自己改变的。其次,设置这些状态不会关闭您的流。它只会暂停编写器,如果应用程序在后台,则暂停编写器就像 no-op。如果你想关闭一个流,你应该使用 finishWithError 来终止这个调用,并可能在需要的时候开始一个新的调用。

Second question: How do we debug this?

一种方法是打开 gRPC 日志(GRPC_TRACE 和 GRPC_VERBOSITY)。另一种方法是在 here 处设置断点,其中 gRPC objc 库从服务器接收 gRPC 消息。

Third question: Is the writer also involved in feeding data received from server to our response handler?

没有。如果您创建一个缓冲管道并将其作为您的调用请求提供,它只会提供要发送到服务器的数据。接收路径由另一个编写器处理(实际上是您的 protocall 对象)。

我看不出有什么地方不鼓励在生产中使用 GRXBufferedPipe。此实用程序的已知缺点是,如果您暂停编写器但继续使用 writeWithValue 向其写入数据,您最终会缓冲大量数据而无法刷新它们,这可能会导致内存问题。