Flux.generate() 抛出 java.lang.IllegalStateException:生成器没有调用任何 SynchronousSink 方法
Flux.generate() is throwing java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
我正在尝试使用 Spring Boot (2.3.4.RELEASE) 进行反应式编程的第一步。
到目前为止,我正在尝试在 REST 控制器方法中调用的服务方法中创建无限的人员流,但它以该异常结束:“java.lang.IllegalStateException:生成器没有调用任何 SynchronousSink 方法
“
我尝试 google 几个小时的一些解决方案,但我没有找到任何适合我正在尝试做的事情的解决方案。
这是我的服务方式:
public Flux<PersonEntity> streamPersons() {
return personRepository.findMinId()
.zipWith(personRepository.findMaxId())
.flatMapMany(minMaxTuple ->
Flux.<PersonEntity> generate(stream ->
personRepository.findById(new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()).findFirst().getAsLong()
)
).delayElements(Duration.ofMillis(300))
);
}
更新 1
我尝试了以下答案中最新更新的代码片段。方法现在看起来像这样:
public Flux<PersonEntity> streamPersons() {
return personRepository.findMinId()
.zipWith(personRepository.findMaxId())
.flatMapMany(minMaxTuple ->
Flux.<PersonEntity>generate(sink -> {
Mono<PersonEntity> foundStock = stockRepository.findById(new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()).findFirst().getAsLong());
sink.next(foundStock);
}
).delayElements(Duration.ofMillis(300))
);
}
不幸的是,我遇到了一个编译器错误,我不知道如何摆脱它。局部变量 foundStock 是一个 Mono,但是方法 sink.next(...) 需要一个常规的 PersonEntity。如何在不阻塞的情况下转换它?
更新 2
这个解决方案是适合我的。我的目的是从数据库中随机抽取一个人,以无限流的形式将其流式传输到请求实例。特别感谢@Toerktumlare!
public Flux<PersonEntity> streamPersons() {
Mono<Tuple2<Long, Long>> minMaxIdTuple = personRepository.findMinId()
.zipWith(personRepository.findMaxId());
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
return interval.flatMapSequential(aLong ->
minMaxIdTuple.map(minMaxTuple ->
new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()))
.flatMapMany(longStream -> personRepository.findById(
longStream.findFirst().getAsLong()))
.flatMap(Flux::just)
);
}
我的控制器方法看起来像这样:
@GetMapping(value = "/person/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<PersonEntity> streamPersons() {
return personService.streamPersons();
}
上述异常的完整堆栈跟踪
java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:276) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler first.reactive.steps.controller.provider.PersonController#streamPersons() [DispatcherHandler]
|_ checkpoint ⇢ HTTP GET "/personapp/api/v1/persons/stream" [ExceptionHandlingWebHandler]
Stack trace:
at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:276) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:204) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:143) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:237) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8325) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:179) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:402) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:536) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1824) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:314) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1824) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:314) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:555) ~[reactor-pool-0.1.6.RELEASE.jar:0.1.6.RELEASE]
at reactor.core.publisher.Operators.complete(Operators.java:135) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:667) ~[reactor-pool-0.1.6.RELEASE.jar:0.1.6.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:156) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:375) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:326) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:185) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:243) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:346) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.request(FluxFilterFuseable.java:403) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:184) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:81) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onSubscribe(FluxFilterFuseable.java:298) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:255) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:148) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:35) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:394) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:520) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:468) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:412) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:293) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:438) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.drainLoop(FluxCreate.java:239) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.drain(FluxCreate.java:205) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.complete(FluxCreate.java:196) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:709) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:974) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:850) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:757) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:256) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
非常感谢任何帮助或提示。
提前致谢
一个Mono
或一个Flux
包含一个叫做sink
的东西。生成器函数是生成源源不断的项目的最简单形式。默认汇是 synchronous
并用于 one-by-one
排放,因此 synchronousSink
.
因此,通过调用生成器方法,您可以公开内部接收器,并且您需要通过接收器 api 通过调用接收器函数 next
、complete
或 error
函数。
示例:
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});
这里我们使用函数签名:
Flux#generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
所以我们是:
- 我们提供初始状态值 0。
- 我们使用状态来选择发射什么(3 的乘法 table 中的一行)。
- 我们也用它来选择何时停止。
- 我们 return 我们在下一次调用中使用的新状态(除非序列在此调用中终止)。
一旦我们订阅了 flux,这就开始了。
以上代码将生成以下序列。
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
在您的代码中,您没有在生成函数中调用 sinks next 方法。
您可以在官方文档中阅读更多关于接收器和生成器函数如何工作的信息Reactor Documentation - Producing在谷歌搜索任何其他内容之前,这些文档应该是您的第一个来源。
更新:
发布此答案后,发布了此评论:
“为什么 this 示例有效而不是我的代码”
他所指的例子是:
Flux<Dish> getDishes() {
return Flux.<Dish> generate(sink -> sink.next(randomDish())) //
.delayElements(Duration.ofMillis(250));
}
在这里我们可以清楚地看到他正在使用内部 sink
并每次调用 sink#next
向订阅客户端发送随机菜。
而在线程启动器提供的代码中:
Flux.<PersonEntity> generate(stream ->
personRepository.findById(new Random()
.longs(minMaxTuple.getT1(), minMaxTuple.getT2())
.findFirst()
.getAsLong()))
.delayElements(Duration.ofMillis(300))
没有使用水槽。
生成器方法用于生成项目,这意味着接收器需要一个具体的对象。它不能是 Mono
或 Flux
。如果您需要 return 这样的,您可以在它们之间进行转换,然后 return 直接将它们发送给客户端。
我正在尝试使用 Spring Boot (2.3.4.RELEASE) 进行反应式编程的第一步。 到目前为止,我正在尝试在 REST 控制器方法中调用的服务方法中创建无限的人员流,但它以该异常结束:“java.lang.IllegalStateException:生成器没有调用任何 SynchronousSink 方法 “ 我尝试 google 几个小时的一些解决方案,但我没有找到任何适合我正在尝试做的事情的解决方案。
这是我的服务方式:
public Flux<PersonEntity> streamPersons() {
return personRepository.findMinId()
.zipWith(personRepository.findMaxId())
.flatMapMany(minMaxTuple ->
Flux.<PersonEntity> generate(stream ->
personRepository.findById(new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()).findFirst().getAsLong()
)
).delayElements(Duration.ofMillis(300))
);
}
更新 1
我尝试了以下答案中最新更新的代码片段。方法现在看起来像这样:
public Flux<PersonEntity> streamPersons() {
return personRepository.findMinId()
.zipWith(personRepository.findMaxId())
.flatMapMany(minMaxTuple ->
Flux.<PersonEntity>generate(sink -> {
Mono<PersonEntity> foundStock = stockRepository.findById(new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()).findFirst().getAsLong());
sink.next(foundStock);
}
).delayElements(Duration.ofMillis(300))
);
}
不幸的是,我遇到了一个编译器错误,我不知道如何摆脱它。局部变量 foundStock 是一个 Mono,但是方法 sink.next(...) 需要一个常规的 PersonEntity。如何在不阻塞的情况下转换它?
更新 2
这个解决方案是适合我的。我的目的是从数据库中随机抽取一个人,以无限流的形式将其流式传输到请求实例。特别感谢@Toerktumlare!
public Flux<PersonEntity> streamPersons() {
Mono<Tuple2<Long, Long>> minMaxIdTuple = personRepository.findMinId()
.zipWith(personRepository.findMaxId());
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
return interval.flatMapSequential(aLong ->
minMaxIdTuple.map(minMaxTuple ->
new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()))
.flatMapMany(longStream -> personRepository.findById(
longStream.findFirst().getAsLong()))
.flatMap(Flux::just)
);
}
我的控制器方法看起来像这样:
@GetMapping(value = "/person/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<PersonEntity> streamPersons() {
return personService.streamPersons();
}
上述异常的完整堆栈跟踪
java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:276) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler first.reactive.steps.controller.provider.PersonController#streamPersons() [DispatcherHandler]
|_ checkpoint ⇢ HTTP GET "/personapp/api/v1/persons/stream" [ExceptionHandlingWebHandler]
Stack trace:
at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:276) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:204) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:143) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:237) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8325) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:179) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:402) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:536) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1824) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:314) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1824) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:314) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:555) ~[reactor-pool-0.1.6.RELEASE.jar:0.1.6.RELEASE]
at reactor.core.publisher.Operators.complete(Operators.java:135) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:667) ~[reactor-pool-0.1.6.RELEASE.jar:0.1.6.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:156) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:375) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:326) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:185) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:243) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:346) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.request(FluxFilterFuseable.java:403) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:184) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:81) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onSubscribe(FluxFilterFuseable.java:298) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:255) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:148) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:35) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:394) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:520) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:468) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:412) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:293) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:438) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.drainLoop(FluxCreate.java:239) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.drain(FluxCreate.java:205) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.complete(FluxCreate.java:196) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:709) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:974) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:850) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:757) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:256) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
非常感谢任何帮助或提示。 提前致谢
一个Mono
或一个Flux
包含一个叫做sink
的东西。生成器函数是生成源源不断的项目的最简单形式。默认汇是 synchronous
并用于 one-by-one
排放,因此 synchronousSink
.
因此,通过调用生成器方法,您可以公开内部接收器,并且您需要通过接收器 api 通过调用接收器函数 next
、complete
或 error
函数。
示例:
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});
这里我们使用函数签名:
Flux#generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
所以我们是:
- 我们提供初始状态值 0。
- 我们使用状态来选择发射什么(3 的乘法 table 中的一行)。
- 我们也用它来选择何时停止。
- 我们 return 我们在下一次调用中使用的新状态(除非序列在此调用中终止)。
一旦我们订阅了 flux,这就开始了。
以上代码将生成以下序列。
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
在您的代码中,您没有在生成函数中调用 sinks next 方法。
您可以在官方文档中阅读更多关于接收器和生成器函数如何工作的信息Reactor Documentation - Producing在谷歌搜索任何其他内容之前,这些文档应该是您的第一个来源。
更新:
发布此答案后,发布了此评论:
“为什么 this 示例有效而不是我的代码”
他所指的例子是:
Flux<Dish> getDishes() {
return Flux.<Dish> generate(sink -> sink.next(randomDish())) //
.delayElements(Duration.ofMillis(250));
}
在这里我们可以清楚地看到他正在使用内部 sink
并每次调用 sink#next
向订阅客户端发送随机菜。
而在线程启动器提供的代码中:
Flux.<PersonEntity> generate(stream ->
personRepository.findById(new Random()
.longs(minMaxTuple.getT1(), minMaxTuple.getT2())
.findFirst()
.getAsLong()))
.delayElements(Duration.ofMillis(300))
没有使用水槽。
生成器方法用于生成项目,这意味着接收器需要一个具体的对象。它不能是 Mono
或 Flux
。如果您需要 return 这样的,您可以在它们之间进行转换,然后 return 直接将它们发送给客户端。