ServerInterceptor 中的 grpc-java 异步调用

grpc-java async call in ServerInterceptor

我有一个 grpc-java 服务器,需要在处理请求之前对 auth 服务进行异步调用。我认为这应该在拦截器中完成,但它需要同步 return 来自 interceptCall()

的监听​​器
class AuthInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
        ServerCall<ReqT, RespT> call,
        Metadata headers,
        ServerCallHandler<ReqT, RespT> next
    ) {
        String token = ""; //get token from headers
        authService.authorize(token).subscribe(
            ok -> // process the request
            error -> call.close(Status.UNAUTHENTICATED, headers)
        );
        // Here we need to return a Listener, but we haven't started a call yet
    }
}

所以问题是:如何从 ServerInterceptor 进行异步调用,如果不能完成,那么异步验证 grpc 请求的正确方法是什么?我知道它可以直接在带有 StreamObservers 的 grpc 服务中完成,但是请求授权是一个横切关注点,拦截器似乎是一个完美的地方。

您确实需要 return 一个 ServerCall.Listener。但是由于您不知道要委托给 Listener,您可以覆盖 Listener 中的每个方法以将回调添加到队列中。身份验证完成后,清空队列。

class DelayedListener<ReqT> extends Listener<ReqT> {
  private Listener<ReqT> delegate;
  private List<Runnable> events = new ArrayList<Runnable>();

  @Override public synchronized void onMessage(ReqT message) {
    if (delegate == null) {
      events.add(() -> delegate.onMessage(message));
    } else {
      delegate.onMessage(message);
    }
  }
  ...
  public synchronized void setDelegate(Listener<ReqT> delegate) {
    this.delegate = delegate;
    for (Runnable runnable : events) {
      runnable.run();
    }
    events = null;
  }
}