双向 gRPC 流有时会在停止和启动后停止处理响应
Bidirectional gRPC stream sometimes stops processing responses after stopping and starting
简而言之
我们有一个移动应用程序,它通过各种双向流将相当大量的数据流式传输到服务器和从服务器流式传输。有时需要关闭流(例如,当应用程序处于后台时)。然后根据需要重新打开它们。有时发生这种情况时,会出现问题:
- 据我所知,流已启动并且 运行 在设备端(所涉及的 GRPCProtocall 和 GRXWriter 的状态要么已启动,要么已暂停)
- 设备在流上发送数据正常(服务器接收数据)
- 服务器似乎可以正常将数据发送回设备(服务器的 Stream.Send 调用 return 成功)
- 在设备上,从不调用流中接收到的数据的结果处理程序
更多详情
我们的代码在下面得到了极大的简化,但这应该能提供足够的细节来说明我们在做什么。双向流由 Switch
class:
管理
class Switch {
/** The protocall over which we send and receive data */
var protocall: GRPCProtoCall?
/** The writer object that writes data to the protocall. */
var writer: GRXBufferedPipe?
/** A static GRPCProtoService as per the .proto */
static let service = APPDataService(host: Settings.grpcHost)
/** A response handler. APPData is the datatype defined by the .proto. */
func rpcResponse(done: Bool, response: APPData?, error: Error?) {
NSLog("Response received")
// Handle response...
}
func start() {
// Create a (new) instance of the writer
// (A writer cannot be used on multiple protocalls)
self.writer = GRXBufferedPipe()
// Setup the protocall
self.protocall = Switch.service.rpcToStream(withRequestWriter: self.writer!, eventHandler: self.rpcRespose(done:response:error:))
// Start the stream
self.protocall.start()
}
func stop() {
// Stop the writer if it is started.
if self.writer.state == .started || self.writer.state == .paused {
self.writer.finishWithError(nil)
}
// Stop the proto call if it is started
if self.protocall?.state == .started || self.protocall?.state == .paused {
protocall?.cancel()
}
self.protocall = nil
}
private var needsRestart: Bool {
if let protocall = self.protocall {
if protocall.state == .notStarted || protocall.state == .finished {
// protocall exists, but isn't running.
return true
} else if writer.state == .notStarted || writer.state == .finished {
// writer isn't running
return true
} else {
// protocall and writer are running
return false
}
} else {
// protocall doesn't exist.
return true
}
}
func restartIfNeeded() {
guard self.needsRestart else { return }
self.stop()
self.start()
}
func write(data: APPData) {
self.writer.writeValue(data)
}
}
就像我说的,大大简化了,但它显示了我们如何启动、停止和重新启动流,以及我们如何检查流是否健康。
当应用程序在后台运行时,我们调用 stop()
。当它被置于前台并且我们再次需要流时,我们调用 start()
。我们定期调用 restartIfNeeded()
,例如。当使用流的屏幕出现时。
正如我上面提到的,偶尔会发生的情况是,当服务器将数据写入流时,我们的响应处理程序 (rpcResponse
) 会停止调用。流似乎是健康的(服务器接收到我们写入的数据,并且 protocall.state
既不是 .notStarted 也不是 .finished)。但是连响应处理程序第一行的日志都没有执行。
第一个问题:我们是否正确管理流,或者我们停止和重新启动流的方式是否容易出错?如果是这样,做这样的事情的正确方法是什么?
第二个问题:我们如何调试这个?我们能想到的所有可以查询状态的信息都告诉我们流已启动并且 运行,但感觉 objc gRPC 库对我们隐藏了很多机制。有没有办法查看来自服务器的响应是否可以 到达我们,但未能触发我们的响应处理程序?
第三个问题:根据上面的代码,我们使用了库提供的GRXBufferedPipe。它的文档建议不要在生产中使用它,因为它没有后推机制。据我们了解,编写器仅用于以同步的、一次一个的方式将数据提供给 gRPC 核心,并且由于服务器可以很好地接收来自我们的数据,我们认为这不是问题。我们错了吗?作者是否也参与将从服务器接收到的数据提供给我们的响应处理程序? IE。如果写入器因过载而损坏,是否会表现为从流中读取数据而不是写入数据时出现问题?
更新:问这个问题一年多后,我们终于在我们的服务器端代码中发现了一个导致客户端出现这种行为的死锁错误。流似乎挂起,因为客户端发送的通信没有被服务器处理,反之亦然,但流实际上是活跃的。接受的答案为如何管理这些双向流提供了很好的建议,我认为这仍然很有价值(它帮助了我们很多!)。但问题实际上是由于编程错误。
此外,对于 运行 遇到此类问题的任何人,可能值得调查一下您是否遇到 this known issue where a channel gets silently dropped when iOS changes its network. This readme 提供了使用 Apple 的 CFStream API 而不是 TCP 套接字的说明作为该问题的可能解决方案。
First question: Are we managing the streams correctly, or is our way of stopping and restarting streams prone to errors? If so, what is the correct way of doing something like this?
根据我查看您的代码可以看出,start()
函数似乎是正确的。在stop()
函数中,不需要调用self.protocall
的cancel()
;通话将以之前的 self.writer.finishWithError(nil)
.
结束
needsrestart()
有点乱。首先,您不应该 poll/set 自己 protocall
的状态。这种状态是自己改变的。其次,设置这些状态不会关闭您的流。它只会暂停编写器,如果应用程序在后台,则暂停编写器就像 no-op。如果你想关闭一个流,你应该使用 finishWithError
来终止这个调用,并可能在需要的时候开始一个新的调用。
Second question: How do we debug this?
一种方法是打开 gRPC 日志(GRPC_TRACE 和 GRPC_VERBOSITY)。另一种方法是在 here 处设置断点,其中 gRPC objc 库从服务器接收 gRPC 消息。
Third question: Is the writer also involved in feeding data received from server to our response handler?
没有。如果您创建一个缓冲管道并将其作为您的调用请求提供,它只会提供要发送到服务器的数据。接收路径由另一个编写器处理(实际上是您的 protocall
对象)。
我看不出有什么地方不鼓励在生产中使用 GRXBufferedPipe
。此实用程序的已知缺点是,如果您暂停编写器但继续使用 writeWithValue
向其写入数据,您最终会缓冲大量数据而无法刷新它们,这可能会导致内存问题。
简而言之
我们有一个移动应用程序,它通过各种双向流将相当大量的数据流式传输到服务器和从服务器流式传输。有时需要关闭流(例如,当应用程序处于后台时)。然后根据需要重新打开它们。有时发生这种情况时,会出现问题:
- 据我所知,流已启动并且 运行 在设备端(所涉及的 GRPCProtocall 和 GRXWriter 的状态要么已启动,要么已暂停)
- 设备在流上发送数据正常(服务器接收数据)
- 服务器似乎可以正常将数据发送回设备(服务器的 Stream.Send 调用 return 成功)
- 在设备上,从不调用流中接收到的数据的结果处理程序
更多详情
我们的代码在下面得到了极大的简化,但这应该能提供足够的细节来说明我们在做什么。双向流由 Switch
class:
class Switch {
/** The protocall over which we send and receive data */
var protocall: GRPCProtoCall?
/** The writer object that writes data to the protocall. */
var writer: GRXBufferedPipe?
/** A static GRPCProtoService as per the .proto */
static let service = APPDataService(host: Settings.grpcHost)
/** A response handler. APPData is the datatype defined by the .proto. */
func rpcResponse(done: Bool, response: APPData?, error: Error?) {
NSLog("Response received")
// Handle response...
}
func start() {
// Create a (new) instance of the writer
// (A writer cannot be used on multiple protocalls)
self.writer = GRXBufferedPipe()
// Setup the protocall
self.protocall = Switch.service.rpcToStream(withRequestWriter: self.writer!, eventHandler: self.rpcRespose(done:response:error:))
// Start the stream
self.protocall.start()
}
func stop() {
// Stop the writer if it is started.
if self.writer.state == .started || self.writer.state == .paused {
self.writer.finishWithError(nil)
}
// Stop the proto call if it is started
if self.protocall?.state == .started || self.protocall?.state == .paused {
protocall?.cancel()
}
self.protocall = nil
}
private var needsRestart: Bool {
if let protocall = self.protocall {
if protocall.state == .notStarted || protocall.state == .finished {
// protocall exists, but isn't running.
return true
} else if writer.state == .notStarted || writer.state == .finished {
// writer isn't running
return true
} else {
// protocall and writer are running
return false
}
} else {
// protocall doesn't exist.
return true
}
}
func restartIfNeeded() {
guard self.needsRestart else { return }
self.stop()
self.start()
}
func write(data: APPData) {
self.writer.writeValue(data)
}
}
就像我说的,大大简化了,但它显示了我们如何启动、停止和重新启动流,以及我们如何检查流是否健康。
当应用程序在后台运行时,我们调用 stop()
。当它被置于前台并且我们再次需要流时,我们调用 start()
。我们定期调用 restartIfNeeded()
,例如。当使用流的屏幕出现时。
正如我上面提到的,偶尔会发生的情况是,当服务器将数据写入流时,我们的响应处理程序 (rpcResponse
) 会停止调用。流似乎是健康的(服务器接收到我们写入的数据,并且 protocall.state
既不是 .notStarted 也不是 .finished)。但是连响应处理程序第一行的日志都没有执行。
第一个问题:我们是否正确管理流,或者我们停止和重新启动流的方式是否容易出错?如果是这样,做这样的事情的正确方法是什么?
第二个问题:我们如何调试这个?我们能想到的所有可以查询状态的信息都告诉我们流已启动并且 运行,但感觉 objc gRPC 库对我们隐藏了很多机制。有没有办法查看来自服务器的响应是否可以 到达我们,但未能触发我们的响应处理程序?
第三个问题:根据上面的代码,我们使用了库提供的GRXBufferedPipe。它的文档建议不要在生产中使用它,因为它没有后推机制。据我们了解,编写器仅用于以同步的、一次一个的方式将数据提供给 gRPC 核心,并且由于服务器可以很好地接收来自我们的数据,我们认为这不是问题。我们错了吗?作者是否也参与将从服务器接收到的数据提供给我们的响应处理程序? IE。如果写入器因过载而损坏,是否会表现为从流中读取数据而不是写入数据时出现问题?
更新:问这个问题一年多后,我们终于在我们的服务器端代码中发现了一个导致客户端出现这种行为的死锁错误。流似乎挂起,因为客户端发送的通信没有被服务器处理,反之亦然,但流实际上是活跃的。接受的答案为如何管理这些双向流提供了很好的建议,我认为这仍然很有价值(它帮助了我们很多!)。但问题实际上是由于编程错误。
此外,对于 运行 遇到此类问题的任何人,可能值得调查一下您是否遇到 this known issue where a channel gets silently dropped when iOS changes its network. This readme 提供了使用 Apple 的 CFStream API 而不是 TCP 套接字的说明作为该问题的可能解决方案。
First question: Are we managing the streams correctly, or is our way of stopping and restarting streams prone to errors? If so, what is the correct way of doing something like this?
根据我查看您的代码可以看出,start()
函数似乎是正确的。在stop()
函数中,不需要调用self.protocall
的cancel()
;通话将以之前的 self.writer.finishWithError(nil)
.
needsrestart()
有点乱。首先,您不应该 poll/set 自己 protocall
的状态。这种状态是自己改变的。其次,设置这些状态不会关闭您的流。它只会暂停编写器,如果应用程序在后台,则暂停编写器就像 no-op。如果你想关闭一个流,你应该使用 finishWithError
来终止这个调用,并可能在需要的时候开始一个新的调用。
Second question: How do we debug this?
一种方法是打开 gRPC 日志(GRPC_TRACE 和 GRPC_VERBOSITY)。另一种方法是在 here 处设置断点,其中 gRPC objc 库从服务器接收 gRPC 消息。
Third question: Is the writer also involved in feeding data received from server to our response handler?
没有。如果您创建一个缓冲管道并将其作为您的调用请求提供,它只会提供要发送到服务器的数据。接收路径由另一个编写器处理(实际上是您的 protocall
对象)。
我看不出有什么地方不鼓励在生产中使用 GRXBufferedPipe
。此实用程序的已知缺点是,如果您暂停编写器但继续使用 writeWithValue
向其写入数据,您最终会缓冲大量数据而无法刷新它们,这可能会导致内存问题。