执行与 gRPC 客户端重新连接的正确方法
Correct way to perform a reconnect with gRPC client
我有一个 Go gRPC 客户端连接到我的 k8s 集群中不同 pod 中的 gRPC 服务器 运行。
它运行良好,正在接收和处理请求。
我现在想知道在 gRPC 服务器 pod 被回收的情况下如何最好地实现弹性。
据我所知,clientconn.go 代码应该自动处理重新连接,但我就是无法让它工作,我担心我的实现一开始就不正确。
从主调用代码:
go func() {
if err := gRPCClient.ProcessRequests(); err != nil {
log.Error("Error while processing Requests")
//do something here??
}
}()
我在 gRPCClient 包装器模块中的代码:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
break
}
if err != nil {
//when pod is recycled, this is what's hit with err:
//rpc error: code = Unavailable desc = transport is closing"
//what is the correct pattern for recovery here so that we can await connection
//and continue processing requests once more?
//should I return err here and somehow restart the ProcessRequests() go routine in the
//main funcition?
break
} else {
//the happy path
//code block to process any requests that are received
}
}
return nil
}
func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
reqclient.conn.Close()
}
编辑:
Emin Laletovic 在下面优雅地回答了我的问题,并在很大程度上得到了解决。
我不得不对 waitUntilReady 函数进行一些更改:
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
currentState := grpcclient.conn.GetState()
stillConnecting := true
for currentState != connectivity.Ready && stillConnecting {
//will return true when state has changed from thisState, false if timeout
stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
currentState = grpcclient.conn.GetState()
log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}
if stillConnecting == false {
log.Error("Connection attempt has timed out.")
return false
}
return true
}
RPC 连接由 clientconn.go
自动处理,但这并不意味着流也被自动处理。
流一旦断开,无论是 RPC 连接断开还是其他原因,都无法自动重新连接,一旦 RPC 连接恢复,您需要从服务器获取新流。
等待 RPC 连接处于 READY
状态并建立新流的伪代码可能如下所示:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.waitUntilReady() {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) process() {
reqclient := GetStream() //always get a new stream
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
grpcclient.done <- true
return
}
if err != nil {
grpcclient.reconnect <- true
return
} else {
//the happy path
//code block to process any requests that are received
}
}
}
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}
我有一个 Go gRPC 客户端连接到我的 k8s 集群中不同 pod 中的 gRPC 服务器 运行。
它运行良好,正在接收和处理请求。
我现在想知道在 gRPC 服务器 pod 被回收的情况下如何最好地实现弹性。
据我所知,clientconn.go 代码应该自动处理重新连接,但我就是无法让它工作,我担心我的实现一开始就不正确。
从主调用代码:
go func() {
if err := gRPCClient.ProcessRequests(); err != nil {
log.Error("Error while processing Requests")
//do something here??
}
}()
我在 gRPCClient 包装器模块中的代码:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
break
}
if err != nil {
//when pod is recycled, this is what's hit with err:
//rpc error: code = Unavailable desc = transport is closing"
//what is the correct pattern for recovery here so that we can await connection
//and continue processing requests once more?
//should I return err here and somehow restart the ProcessRequests() go routine in the
//main funcition?
break
} else {
//the happy path
//code block to process any requests that are received
}
}
return nil
}
func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
reqclient.conn.Close()
}
编辑: Emin Laletovic 在下面优雅地回答了我的问题,并在很大程度上得到了解决。 我不得不对 waitUntilReady 函数进行一些更改:
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
currentState := grpcclient.conn.GetState()
stillConnecting := true
for currentState != connectivity.Ready && stillConnecting {
//will return true when state has changed from thisState, false if timeout
stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
currentState = grpcclient.conn.GetState()
log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}
if stillConnecting == false {
log.Error("Connection attempt has timed out.")
return false
}
return true
}
RPC 连接由 clientconn.go
自动处理,但这并不意味着流也被自动处理。
流一旦断开,无论是 RPC 连接断开还是其他原因,都无法自动重新连接,一旦 RPC 连接恢复,您需要从服务器获取新流。
等待 RPC 连接处于 READY
状态并建立新流的伪代码可能如下所示:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.waitUntilReady() {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) process() {
reqclient := GetStream() //always get a new stream
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
grpcclient.done <- true
return
}
if err != nil {
grpcclient.reconnect <- true
return
} else {
//the happy path
//code block to process any requests that are received
}
}
}
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}