ServerInterceptor 中的 grpc-java 异步调用
grpc-java async call in ServerInterceptor
我有一个 grpc-java 服务器,需要在处理请求之前对 auth 服务进行异步调用。我认为这应该在拦截器中完成,但它需要同步 return 来自 interceptCall()
的监听器
class AuthInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
String token = ""; //get token from headers
authService.authorize(token).subscribe(
ok -> // process the request
error -> call.close(Status.UNAUTHENTICATED, headers)
);
// Here we need to return a Listener, but we haven't started a call yet
}
}
所以问题是:如何从 ServerInterceptor 进行异步调用,如果不能完成,那么异步验证 grpc 请求的正确方法是什么?我知道它可以直接在带有 StreamObservers 的 grpc 服务中完成,但是请求授权是一个横切关注点,拦截器似乎是一个完美的地方。
您确实需要 return 一个 ServerCall.Listener
。但是由于您不知道要委托给 Listener
,您可以覆盖 Listener
中的每个方法以将回调添加到队列中。身份验证完成后,清空队列。
class DelayedListener<ReqT> extends Listener<ReqT> {
private Listener<ReqT> delegate;
private List<Runnable> events = new ArrayList<Runnable>();
@Override public synchronized void onMessage(ReqT message) {
if (delegate == null) {
events.add(() -> delegate.onMessage(message));
} else {
delegate.onMessage(message);
}
}
...
public synchronized void setDelegate(Listener<ReqT> delegate) {
this.delegate = delegate;
for (Runnable runnable : events) {
runnable.run();
}
events = null;
}
}
我有一个 grpc-java 服务器,需要在处理请求之前对 auth 服务进行异步调用。我认为这应该在拦截器中完成,但它需要同步 return 来自 interceptCall()
的监听器class AuthInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
String token = ""; //get token from headers
authService.authorize(token).subscribe(
ok -> // process the request
error -> call.close(Status.UNAUTHENTICATED, headers)
);
// Here we need to return a Listener, but we haven't started a call yet
}
}
所以问题是:如何从 ServerInterceptor 进行异步调用,如果不能完成,那么异步验证 grpc 请求的正确方法是什么?我知道它可以直接在带有 StreamObservers 的 grpc 服务中完成,但是请求授权是一个横切关注点,拦截器似乎是一个完美的地方。
您确实需要 return 一个 ServerCall.Listener
。但是由于您不知道要委托给 Listener
,您可以覆盖 Listener
中的每个方法以将回调添加到队列中。身份验证完成后,清空队列。
class DelayedListener<ReqT> extends Listener<ReqT> {
private Listener<ReqT> delegate;
private List<Runnable> events = new ArrayList<Runnable>();
@Override public synchronized void onMessage(ReqT message) {
if (delegate == null) {
events.add(() -> delegate.onMessage(message));
} else {
delegate.onMessage(message);
}
}
...
public synchronized void setDelegate(Listener<ReqT> delegate) {
this.delegate = delegate;
for (Runnable runnable : events) {
runnable.run();
}
events = null;
}
}