为什么我的 Grpc ClientInterceptor 没有正确的反应器上下文?
Why does my Grpc ClientInterceptor not have the right reactor Context?
我正在尝试了解反应器上下文以及我收到此错误的原因:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.NoSuchElementException: Context does not contain key
我有一个 WebFilter 可以写入一些上下文
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.contextWrite(Context.of("CONTEXT-HEADER", "foobar"));
}
我有一个试图读取它的 ClientInterceptor
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Mono.deferContextual(context -> {
Metadata.Key < String > key =
Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
headers.put(key, context.get("CONTEXT-HEADER"));
delegate().start(responseListener, headers);
return Mono.empty();
}).subscribe();
}
};
}
以防万一,我在创建 grpc 存根时添加了这个拦截器。
MyServiceGrpc.newBlockingStub(channel).withInterceptors(new MyClientInterceptor());
我的 grpc 调用是从 Controller 方法完成的
@GetMapping(path="test")
public Mono<String> test() {
return Mono.just(grpcStub.call(RequestBuilder.build()));
}
我在 context.get("CONTEXT-HEADER")
调用中遇到上述错误。
我应该注意到,在 Controller 方法中调用时,从 Context 获取 header 的相同代码工作正常。但是在调用调用 ClientInterceptor
的 grpc 方法之后,反应器管道似乎在某个时候中断了
是什么导致反应器链断裂?
Reactor 上下文通过订阅链从下到上传播。
订阅从底部开始,在每一步订阅者都将其当前上下文提供给发布者。在下一步中,发布者成为订阅者并将上下文传播到下一个发布者等。参见 reactor.core.CorePubliser
和 reactor.core.CoreSubscriber
和 reactor context docs。
例如在下面的链中
1: Mono.just(...);
2: .flatMap(
...
3: Mono.just(...).subscribe()
...
)
4: .contextWrite(...);
5: .flatMap(...);
6: .subscribe();
Reactor 操作符从反应流中封装发布者和订阅者。
第 1 行的操作员是发布者,第 6 行的操作员是订阅者,第 2、4、5 行的操作员都是。
上下文在第 5,6 行为空;在第 4 行填充,并包含 1 和 2 处的运算符可以使用填充的上下文。
第 3 行看不到上下文,因为上下文“不订阅”它。即使它在运算符内部,也可以访问上下文。
在您的拦截器中,就像第 3 行一样,您订阅 'manually' 而您的订阅只是没有任何上下文。
Mono.deferContextual(context -> {
Metadata.Key < String > key =
Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
headers.put(key, context.get("CONTEXT-HEADER"));
delegate().start(responseListener, headers);
return Mono.empty();
}).subscribe();
我不知道有任何 'out-of-the-box' 拦截器可以访问反应器上下文,但是您可以尝试在调用存根之前在反应器上下文和 grpc 上下文之间进行手动连接。这个integration tests in reactive-grpc可能会给出一个使用grpc Context的例子。
我正在尝试了解反应器上下文以及我收到此错误的原因:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.NoSuchElementException: Context does not contain key
我有一个 WebFilter 可以写入一些上下文
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.contextWrite(Context.of("CONTEXT-HEADER", "foobar"));
}
我有一个试图读取它的 ClientInterceptor
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Mono.deferContextual(context -> {
Metadata.Key < String > key =
Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
headers.put(key, context.get("CONTEXT-HEADER"));
delegate().start(responseListener, headers);
return Mono.empty();
}).subscribe();
}
};
}
以防万一,我在创建 grpc 存根时添加了这个拦截器。
MyServiceGrpc.newBlockingStub(channel).withInterceptors(new MyClientInterceptor());
我的 grpc 调用是从 Controller 方法完成的
@GetMapping(path="test")
public Mono<String> test() {
return Mono.just(grpcStub.call(RequestBuilder.build()));
}
我在 context.get("CONTEXT-HEADER")
调用中遇到上述错误。
我应该注意到,在 Controller 方法中调用时,从 Context 获取 header 的相同代码工作正常。但是在调用调用 ClientInterceptor
是什么导致反应器链断裂?
Reactor 上下文通过订阅链从下到上传播。
订阅从底部开始,在每一步订阅者都将其当前上下文提供给发布者。在下一步中,发布者成为订阅者并将上下文传播到下一个发布者等。参见 reactor.core.CorePubliser
和 reactor.core.CoreSubscriber
和 reactor context docs。
例如在下面的链中
1: Mono.just(...);
2: .flatMap(
...
3: Mono.just(...).subscribe()
...
)
4: .contextWrite(...);
5: .flatMap(...);
6: .subscribe();
Reactor 操作符从反应流中封装发布者和订阅者。
第 1 行的操作员是发布者,第 6 行的操作员是订阅者,第 2、4、5 行的操作员都是。
上下文在第 5,6 行为空;在第 4 行填充,并包含 1 和 2 处的运算符可以使用填充的上下文。
第 3 行看不到上下文,因为上下文“不订阅”它。即使它在运算符内部,也可以访问上下文。
在您的拦截器中,就像第 3 行一样,您订阅 'manually' 而您的订阅只是没有任何上下文。
Mono.deferContextual(context -> {
Metadata.Key < String > key =
Metadata.Key.of("CONTEXT-HEADER", Metadata.ASCII_STRING_MARSHALLER);
headers.put(key, context.get("CONTEXT-HEADER"));
delegate().start(responseListener, headers);
return Mono.empty();
}).subscribe();
我不知道有任何 'out-of-the-box' 拦截器可以访问反应器上下文,但是您可以尝试在调用存根之前在反应器上下文和 grpc 上下文之间进行手动连接。这个integration tests in reactive-grpc可能会给出一个使用grpc Context的例子。