为什么我的 Grpc ClientInterceptor 没有正确的反应器上下文?

Why does my Grpc ClientInterceptor not have the right reactor Context?

我正在尝试了解反应器上下文以及我收到此错误的原因:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.NoSuchElementException: Context does not contain key

我有一个 WebFilter 可以写入一些上下文

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
  return chain.filter(exchange)
      .contextWrite(Context.of("CONTEXT-HEADER", "foobar"));
}

我有一个试图读取它的 ClientInterceptor

public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
  final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);

  return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
    @Override
    public void start(Listener<RespT> responseListener, Metadata headers) {

      Mono.deferContextual(context -> {
        Metadata.Key < String > key =
            Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
        headers.put(key, context.get("CONTEXT-HEADER"));

        delegate().start(responseListener, headers);

        return Mono.empty();
      }).subscribe();


    }
  };
}

以防万一,我在创建 grpc 存根时添加了这个拦截器。

MyServiceGrpc.newBlockingStub(channel).withInterceptors(new MyClientInterceptor());

我的 grpc 调用是从 Controller 方法完成的

@GetMapping(path="test")
public Mono<String> test() {
  return Mono.just(grpcStub.call(RequestBuilder.build()));
}

我在 context.get("CONTEXT-HEADER") 调用中遇到上述错误。 我应该注意到,在 Controller 方法中调用时,从 Context 获取 header 的相同代码工作正常。但是在调用调用 ClientInterceptor

的 grpc 方法之后,反应器管道似乎在某个时候中断了

是什么导致反应器链断裂?

Reactor 上下文通过订阅链从下到上传播。

订阅从底部开始,在每一步订阅者都将其当前上下文提供给发布者。在下一步中,发布者成为订阅者并将上下文传播到下一个发布者等。参见 reactor.core.CorePubliserreactor.core.CoreSubscriberreactor context docs

例如在下面的链中

1: Mono.just(...);
2:     .flatMap(
           ... 
3:         Mono.just(...).subscribe()
           ...
         )
4:     .contextWrite(...);
5:     .flatMap(...);
6:     .subscribe();

Reactor 操作符从反应流中封装发布者和订阅者。

第 1 行的操作员是发布者,第 6 行的操作员是订阅者,第 2、4、5 行的操作员都是。

上下文在第 5,6 行为空;在第 4 行填充,并包含 1 和 2 处的运算符可以使用填充的上下文。

第 3 行看不到上下文,因为上下文“不订阅”它。即使它在运算符内部,也可以访问上下文。

在您的拦截器中,就像第 3 行一样,您订阅 'manually' 而您的订阅只是没有任何上下文。

Mono.deferContextual(context -> {
        Metadata.Key < String > key =
            Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
        headers.put(key, context.get("CONTEXT-HEADER"));

        delegate().start(responseListener, headers);

        return Mono.empty();
      }).subscribe();

我不知道有任何 'out-of-the-box' 拦截器可以访问反应器上下文,但是您可以尝试在调用存根之前在反应器上下文和 grpc 上下文之间进行手动连接。这个integration tests in reactive-grpc可能会给出一个使用grpc Context的例子。