Flux.generate() 抛出 java.lang.IllegalStateException:生成器没有调用任何 SynchronousSink 方法

Flux.generate() is throwing java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method

我正在尝试使用 Spring Boot (2.3.4.RELEASE) 进行反应式编程的第一步。 到目前为止,我正在尝试在 REST 控制器方法中调用的服务方法中创建无限的人员流,但它以该异常结束:“java.lang.IllegalStateException:生成器没有调用任何 SynchronousSink 方法 “ 我尝试 google 几个小时的一些解决方案,但我没有找到任何适合我正在尝试做的事情的解决方案。

这是我的服务方式:

public Flux<PersonEntity> streamPersons() {
        return personRepository.findMinId()
                .zipWith(personRepository.findMaxId())
                .flatMapMany(minMaxTuple ->
                        Flux.<PersonEntity> generate(stream ->
                                personRepository.findById(new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()).findFirst().getAsLong()
                                )
                        ).delayElements(Duration.ofMillis(300))
                );
    } 

更新 1

我尝试了以下答案中最新更新的代码片段。方法现在看起来像这样:

    public Flux<PersonEntity> streamPersons() {
        return personRepository.findMinId()
                .zipWith(personRepository.findMaxId())
                .flatMapMany(minMaxTuple ->
                        Flux.<PersonEntity>generate(sink -> {
                            Mono<PersonEntity> foundStock = stockRepository.findById(new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()).findFirst().getAsLong());
                            sink.next(foundStock);
                        }
                        ).delayElements(Duration.ofMillis(300))
                );
    }

不幸的是,我遇到了一个编译器错误,我不知道如何摆脱它。局部变量 foundStock 是一个 Mono,但是方法 sink.next(...) 需要一个常规的 PersonEntity。如何在不阻塞的情况下转换它?

更新 2

这个解决方案是适合我的。我的目的是从数据库中随机抽取一个人,以无限流的形式将其流式传输到请求实例。特别感谢@Toerktumlare!

public Flux<PersonEntity> streamPersons() {
    Mono<Tuple2<Long, Long>> minMaxIdTuple = personRepository.findMinId()
            .zipWith(personRepository.findMaxId());
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
    
    return interval.flatMapSequential(aLong ->
           minMaxIdTuple.map(minMaxTuple ->
                new Random().longs(minMaxTuple.getT1(), minMaxTuple.getT2()))
                    .flatMapMany(longStream -> personRepository.findById(
                         longStream.findFirst().getAsLong()))
                    .flatMap(Flux::just)
        );
    }

我的控制器方法看起来像这样:

    @GetMapping(value = "/person/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<PersonEntity> streamPersons() {
        return personService.streamPersons();
    }

上述异常的完整堆栈跟踪

java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
    at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:276) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Handler first.reactive.steps.controller.provider.PersonController#streamPersons() [DispatcherHandler]
    |_ checkpoint ⇢ HTTP GET "/personapp/api/v1/persons/stream" [ExceptionHandlingWebHandler]
Stack trace:
        at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:276) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:204) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:143) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:237) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8325) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:179) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:402) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:536) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1824) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:314) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1824) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:314) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:555) ~[reactor-pool-0.1.6.RELEASE.jar:0.1.6.RELEASE]
        at reactor.core.publisher.Operators.complete(Operators.java:135) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:667) ~[reactor-pool-0.1.6.RELEASE.jar:0.1.6.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:156) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:375) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:326) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:185) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:243) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:346) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.request(FluxFilterFuseable.java:403) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:184) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:81) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onSubscribe(FluxFilterFuseable.java:298) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:255) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:148) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:35) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:394) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:520) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:468) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:412) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:293) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:438) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxCreate$SerializedSink.drainLoop(FluxCreate.java:239) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxCreate$SerializedSink.drain(FluxCreate.java:205) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxCreate$SerializedSink.complete(FluxCreate.java:196) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:709) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:974) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:850) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:757) ~[r2dbc-postgresql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:256) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.12.RELEASE.jar:0.9.12.RELEASE]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

非常感谢任何帮助或提示。 提前致谢

一个Mono或一个Flux包含一个叫做sink的东西。生成器函数是生成源源不断的项目的最简单形式。默认汇是 synchronous 并用于 one-by-one 排放,因此 synchronousSink.

因此,通过调用生成器方法,您可以公开内部接收器,并且您需要通过接收器 api 通过调用接收器函数 nextcompleteerror函数。

示例:

Flux<String> flux = Flux.generate(
    () -> 0, 
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); 
      if (state == 10) sink.complete(); 
      return state + 1; 
    });

这里我们使用函数签名:

Flux#generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)

所以我们是:

  • 我们提供初始状态值 0。
  • 我们使用状态来选择发射什么(3 的乘法 table 中的一行)。
  • 我们也用它来选择何时停止。
  • 我们 return 我们在下一次调用中使用的新状态(除非序列在此调用中终止)。

一旦我们订阅了 flux,这就开始了。

以上代码将生成以下序列。

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

在您的代码中,您没有在生成函数中调用 sinks next 方法。

您可以在官方文档中阅读更多关于接收器和生成器函数如何工作的信息Reactor Documentation - Producing在谷歌搜索任何其他内容之前,这些文档应该是您的第一个来源。

更新:

发布此答案后,发布了此评论:

“为什么 this 示例有效而不是我的代码”

他所指的例子是:

Flux<Dish> getDishes() {
    return Flux.<Dish> generate(sink -> sink.next(randomDish())) //
                .delayElements(Duration.ofMillis(250));
}

在这里我们可以清楚地看到他正在使用内部 sink 并每次调用 sink#next 向订阅客户端发送随机菜。

而在线程启动器提供的代码中:

Flux.<PersonEntity> generate(stream ->
       personRepository.findById(new Random()
           .longs(minMaxTuple.getT1(), minMaxTuple.getT2())
           .findFirst()
           .getAsLong()))
       .delayElements(Duration.ofMillis(300))

没有使用水槽。

生成器方法用于生成项目,这意味着接收器需要一个具体的对象。它不能是 MonoFlux。如果您需要 return 这样的,您可以在它们之间进行转换,然后 return 直接将它们发送给客户端。