Spring boot + webflux:运行 一些步骤并行时上下文丢失
Spring boot + webflux: context lost when running some steps in parallel
Spring 开机:2.1.3.RELEASE
你好,
我正在尝试使用 spring webflux 的上下文功能来携带一个简单的变量。我有一个 WebFilter 设置了这样一个变量的上下文,我尝试在我 flux/stream 的不同阶段在我的控制器中使用它。在某些时候,我在调用 Flux class.
的方法 "parallel()" 后丢失了它
- 过滤器:
public class TestFilter implements WebFilter {
private Logger LOG = LoggerFactory.getLogger(TestFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.doOnEach(voidSignal -> System.out.println("filter:"+voidSignal.getContext().getOrEmpty("blob"))).subscriberContext(Context.of("blob", "kapoue"));
}
}
- 控制器:
@RestController
@RequestMapping(TestControllerWebFlux.ROOT)
public class TestControllerWebFlux {
static final String ROOT = "/flux";
static final String TEST = "/test";
private WebClient webClient = WebClient.create();
@GetMapping(
value = TEST,
produces = {MediaType.APPLICATION_JSON_VALUE})
public Mono<String> test() {
System.out.println("controller1:"+Thread.currentThread());
Flux<String> call = webClient.get().uri("http://localhost:" + 8080 + ROOT + "/test2").retrieve().bodyToFlux(Result.class).map(Result::getValue);
return call.map(s -> s+"0")
.doOnEach(stringSignal -> System.out.println("controller2:"+stringSignal.getContext().getOrEmpty("blob")))
.parallel()
.doOnEach(stringSignal -> System.out.println("controller3:"+stringSignal.getContext().getOrEmpty("blob")))
.map(s -> s+"0")
.doOnEach(stringSignal -> System.out.println("controller4:"+stringSignal.getContext().getOrEmpty("blob")))
.reduce((s, s2) -> s+s2)
.doOnEach(stringSignal -> System.out.println("controller5:"+stringSignal.getContext().getOrEmpty("blob")))
.map(s -> {
System.out.println("controller6:"+Thread.currentThread());
return s;
});
}
@GetMapping(
value = "test2",
produces = {MediaType.APPLICATION_JSON_VALUE})
public Flux<Result> test2() {
return Flux.just(new Result("0"), new Result("0"), new Result("0"));
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Result {
private String value;
}
}
我所做的就是调用 http://localhost:8080/flux/test/ 端点,我得到了:
controller1:Thread[reactor-http-nio-2,5,main]
controller2:Optional[kapoue]
controller3:Optional.empty
controller4:Optional.empty
controller2:Optional[kapoue]
controller3:Optional.empty
controller4:Optional.empty
controller2:Optional[kapoue]
controller3:Optional.empty
controller4:Optional.empty
controller2:Optional[kapoue]
controller3:Optional.empty
controller4:Optional.empty
controller3:Optional.empty
controller4:Optional.empty
controller3:Optional.empty
controller4:Optional.empty
controller3:Optional.empty
controller4:Optional.empty
controller5:Optional[kapoue]
controller6:Thread[reactor-http-nio-2,5,main]
filter:Optional[kapoue]
如您所见,上下文在 'parallel' 方法后立即丢失,并在减少后以某种方式恢复。
这是一个错误还是我不应该在像这样的调用之后尝试并行 运行 事情?
提前感谢您的帮助。
这看起来像是 Reactor 中的错误。我举报了:https://github.com/reactor/reactor-core/issues/1656
Spring 开机:2.1.3.RELEASE
你好,
我正在尝试使用 spring webflux 的上下文功能来携带一个简单的变量。我有一个 WebFilter 设置了这样一个变量的上下文,我尝试在我 flux/stream 的不同阶段在我的控制器中使用它。在某些时候,我在调用 Flux class.
的方法 "parallel()" 后丢失了它- 过滤器:
public class TestFilter implements WebFilter {
private Logger LOG = LoggerFactory.getLogger(TestFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.doOnEach(voidSignal -> System.out.println("filter:"+voidSignal.getContext().getOrEmpty("blob"))).subscriberContext(Context.of("blob", "kapoue"));
}
}
- 控制器:
@RestController
@RequestMapping(TestControllerWebFlux.ROOT)
public class TestControllerWebFlux {
static final String ROOT = "/flux";
static final String TEST = "/test";
private WebClient webClient = WebClient.create();
@GetMapping(
value = TEST,
produces = {MediaType.APPLICATION_JSON_VALUE})
public Mono<String> test() {
System.out.println("controller1:"+Thread.currentThread());
Flux<String> call = webClient.get().uri("http://localhost:" + 8080 + ROOT + "/test2").retrieve().bodyToFlux(Result.class).map(Result::getValue);
return call.map(s -> s+"0")
.doOnEach(stringSignal -> System.out.println("controller2:"+stringSignal.getContext().getOrEmpty("blob")))
.parallel()
.doOnEach(stringSignal -> System.out.println("controller3:"+stringSignal.getContext().getOrEmpty("blob")))
.map(s -> s+"0")
.doOnEach(stringSignal -> System.out.println("controller4:"+stringSignal.getContext().getOrEmpty("blob")))
.reduce((s, s2) -> s+s2)
.doOnEach(stringSignal -> System.out.println("controller5:"+stringSignal.getContext().getOrEmpty("blob")))
.map(s -> {
System.out.println("controller6:"+Thread.currentThread());
return s;
});
}
@GetMapping(
value = "test2",
produces = {MediaType.APPLICATION_JSON_VALUE})
public Flux<Result> test2() {
return Flux.just(new Result("0"), new Result("0"), new Result("0"));
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Result {
private String value;
}
}
我所做的就是调用 http://localhost:8080/flux/test/ 端点,我得到了:
controller1:Thread[reactor-http-nio-2,5,main] controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller5:Optional[kapoue] controller6:Thread[reactor-http-nio-2,5,main] filter:Optional[kapoue]
如您所见,上下文在 'parallel' 方法后立即丢失,并在减少后以某种方式恢复。
这是一个错误还是我不应该在像这样的调用之后尝试并行 运行 事情?
提前感谢您的帮助。
这看起来像是 Reactor 中的错误。我举报了:https://github.com/reactor/reactor-core/issues/1656