Spring-WebFlux 通量失败并显示上下文
Spring-WebFlux Flux fails with Context
我想在我的 Flux 管道中使用 Context
来绕过过滤。
这是我的:
public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
)
.filter(l ->
l.getT3() ||
(!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
.map(Tuple2::getT1)
.log()
.map(this::
}
这是这个的输入:
public void setRealtime(Flux<Bar> input) {
Flux.zip(input, Mono.subscriberContext())
.doOnComplete(() -> {
...
})
.doOnNext(t -> {
...
})
.subscribe()
}
我可以告诉我 ...
中的代码没有失败,我什至可以访问 Context
映射,但是当第一次迭代完成时,我得到:
onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})
并且订阅者断开连接。
所以我的问题是我是否正确使用它,这里可能有什么问题?
编辑:
当我使用它的价值时,我已经尝试 repeat()
Mono.subscriberContext()
:
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext()
.map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
)
.filter(l ->
l.getT3().get() ||
(!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
.map(Tuple2::getT1)
并将 AtomicBoolean
设置为订阅者端的上下文,当我需要上游信号时,只需更改此变量引用中的值,但它根本没有改变:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> {
System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
// Analysis
System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
})
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
.subscribe();
至少重复 Flux
不会断开连接,但我从中获取的值不会更新。我没有其他线索。
Spring-webflux: 2.1.3.RELEASE
这个有效:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> { ... }
.flatMap(bar -> Mono.subscriberContext()
.map(c -> Tuples.of(bar, c)))
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", new AtomicBoolean()))
.subscribe();
所以重点是在我的例子中将 AtomicBoolean
设置为 cotnext,然后如果您想更改它的值,则从上下文中提取该变量。上游通量也一样。
我想在我的 Flux 管道中使用 Context
来绕过过滤。
这是我的:
public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
)
.filter(l ->
l.getT3() ||
(!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
.map(Tuple2::getT1)
.log()
.map(this::
}
这是这个的输入:
public void setRealtime(Flux<Bar> input) {
Flux.zip(input, Mono.subscriberContext())
.doOnComplete(() -> {
...
})
.doOnNext(t -> {
...
})
.subscribe()
}
我可以告诉我 ...
中的代码没有失败,我什至可以访问 Context
映射,但是当第一次迭代完成时,我得到:
onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})
并且订阅者断开连接。
所以我的问题是我是否正确使用它,这里可能有什么问题?
编辑:
当我使用它的价值时,我已经尝试 repeat()
Mono.subscriberContext()
:
return Flux.zip(
ohlcIntfFlux,
ohlcIntfFlux.skip(1),
Mono.subscriberContext()
.map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
)
.filter(l ->
l.getT3().get() ||
(!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
.map(Tuple2::getT1)
并将 AtomicBoolean
设置为订阅者端的上下文,当我需要上游信号时,只需更改此变量引用中的值,但它根本没有改变:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> {
System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
// Analysis
System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
})
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
.subscribe();
至少重复 Flux
不会断开连接,但我从中获取的值不会更新。我没有其他线索。
Spring-webflux: 2.1.3.RELEASE
这个有效:
input
.onErrorContinue((throwable, o) -> throwable.getMessage())
.doOnComplete(() -> { ... }
.flatMap(bar -> Mono.subscriberContext()
.map(c -> Tuples.of(bar, c)))
.doOnNext(this::defaultRealtimeEvaluator)
.subscriberContext(Context.of("isRealtime", new AtomicBoolean()))
.subscribe();
所以重点是在我的例子中将 AtomicBoolean
设置为 cotnext,然后如果您想更改它的值,则从上下文中提取该变量。上游通量也一样。