Java 反应堆通量未按预期映射
Java reactor flux not mapping according expectation
我有以下演示应用程序设置:
- mongodb 包含 2 collections:1 个是加密货币,1 个是这些加密货币的汇率
- spring webflux 项目使用服务器发送事件获取这些汇率的实时更新
我有一项服务 returns List<CryptoCurrencyRateDTO>
的 Flux
基于加密货币 collection 中存在的货币。我为这些货币中的每一种生成一个随机汇率并将它们流式传输到 Web 客户端。
服务是这样的:
@Service
public class CryptoCurrencyRateService {
@Autowired private CryptoCurrencyRateRepository rateRepository;
@Autowired private CryptoCurrencyRepository currencyRepository;
// constructor
public Flux<List<CryptoCurrencyRateDTO>> realtimeRates() {
return currencyRepository.findAll()
.map(CryptoCurrency::getSymbol)
.flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
.zipWith(
Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
(rate, timestamp) -> new CryptoCurrencyRate(rate.getSymbol(), timestamp, randomRateBasedOnPrevious )
)
.flatMap(rateRepository::save)
.map(rateMapper::toDto)
.collectList()
.delayElement(Duration.ofSeconds(5))
.repeat();
}
}
CryptoCurrencyRateRepository
如下:
@Repository
public interface CryptoCurrencyRateRepository extends ReactiveMongoRepository<CryptoCurrencyRate, String> {
Mono<CryptoCurrencyRate> findTopBySymbolOrderByTimestamp(String symbol);
}
然而,在调用 .flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
之后,我只得到一个包含 1 个项目的 Flux
,而我认为我会得到一个 Flux
,其中包含 [=] 中每个符号的最高汇率20=] 调用,因为我的加密货币 collection 包含 3 种货币。
当我查看日志记录时,我发现对 findTopBySymbolOrderByTimestamp
的调用执行了 3 次
2018-11-16 16:04:33.626 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "BTC" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.627 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "ETH" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.629 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "XRP" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
我无法重现您的问题。我是这样模仿的
public static void main(String[] args) {
Flux<String> stringFlux = Flux.fromStream(Stream.of("a", "b", "c"));
System.out.println(realtimeRates(stringFlux).blockFirst());
}
static Flux<List<String>> realtimeRates(Flux<String> list) {
Flux<String> symbols = list.map(Scratch::getSymbol);
Flux<String> topRates = symbols.flatMap(Scratch::findTopBySymbolOrderByTimestamp);
Flux<String> zip = topRates.zipWith(
Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
(rate, timestamp) -> rate + timestamp.toString());
Mono<List<String>> listMono = zip.collectList();
Mono<List<String>> delayElement = listMono.delayElement(Duration.ofSeconds(5));
Flux<List<String>> repeat = delayElement.repeat();
return repeat;
}
static Mono<String> findTopBySymbolOrderByTimestamp(String symbol) {
return Mono.just("other-" + symbol);
}
static String getSymbol(String rate) {
return rate.toLowerCase();
}
如您所见,您将得到类似于 [other-a1542821666133, other-b1542821666133, other-c1542821666133]
的内容。
您如何检查平面图结果?请注意,如果您使用 blockFirst()
或 blockLast()
方法执行此操作,您将只会获得一个元素,因为它是 Flux<String>
(检查上面代码中的 topRates
变量)
看来我的预期是正确的。唯一出错的是,并非所有对象都保存在 Mongo 中,因为它们缺少 ID。
我有以下演示应用程序设置:
- mongodb 包含 2 collections:1 个是加密货币,1 个是这些加密货币的汇率
- spring webflux 项目使用服务器发送事件获取这些汇率的实时更新
我有一项服务 returns List<CryptoCurrencyRateDTO>
的 Flux
基于加密货币 collection 中存在的货币。我为这些货币中的每一种生成一个随机汇率并将它们流式传输到 Web 客户端。
服务是这样的:
@Service
public class CryptoCurrencyRateService {
@Autowired private CryptoCurrencyRateRepository rateRepository;
@Autowired private CryptoCurrencyRepository currencyRepository;
// constructor
public Flux<List<CryptoCurrencyRateDTO>> realtimeRates() {
return currencyRepository.findAll()
.map(CryptoCurrency::getSymbol)
.flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
.zipWith(
Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
(rate, timestamp) -> new CryptoCurrencyRate(rate.getSymbol(), timestamp, randomRateBasedOnPrevious )
)
.flatMap(rateRepository::save)
.map(rateMapper::toDto)
.collectList()
.delayElement(Duration.ofSeconds(5))
.repeat();
}
}
CryptoCurrencyRateRepository
如下:
@Repository
public interface CryptoCurrencyRateRepository extends ReactiveMongoRepository<CryptoCurrencyRate, String> {
Mono<CryptoCurrencyRate> findTopBySymbolOrderByTimestamp(String symbol);
}
然而,在调用 .flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
之后,我只得到一个包含 1 个项目的 Flux
,而我认为我会得到一个 Flux
,其中包含 [=] 中每个符号的最高汇率20=] 调用,因为我的加密货币 collection 包含 3 种货币。
当我查看日志记录时,我发现对 findTopBySymbolOrderByTimestamp
的调用执行了 3 次
2018-11-16 16:04:33.626 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "BTC" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.627 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "ETH" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.629 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "XRP" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
我无法重现您的问题。我是这样模仿的
public static void main(String[] args) {
Flux<String> stringFlux = Flux.fromStream(Stream.of("a", "b", "c"));
System.out.println(realtimeRates(stringFlux).blockFirst());
}
static Flux<List<String>> realtimeRates(Flux<String> list) {
Flux<String> symbols = list.map(Scratch::getSymbol);
Flux<String> topRates = symbols.flatMap(Scratch::findTopBySymbolOrderByTimestamp);
Flux<String> zip = topRates.zipWith(
Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
(rate, timestamp) -> rate + timestamp.toString());
Mono<List<String>> listMono = zip.collectList();
Mono<List<String>> delayElement = listMono.delayElement(Duration.ofSeconds(5));
Flux<List<String>> repeat = delayElement.repeat();
return repeat;
}
static Mono<String> findTopBySymbolOrderByTimestamp(String symbol) {
return Mono.just("other-" + symbol);
}
static String getSymbol(String rate) {
return rate.toLowerCase();
}
如您所见,您将得到类似于 [other-a1542821666133, other-b1542821666133, other-c1542821666133]
的内容。
您如何检查平面图结果?请注意,如果您使用 blockFirst()
或 blockLast()
方法执行此操作,您将只会获得一个元素,因为它是 Flux<String>
(检查上面代码中的 topRates
变量)
看来我的预期是正确的。唯一出错的是,并非所有对象都保存在 Mongo 中,因为它们缺少 ID。