Flux.switchOnFirst 的第二个参数有时不包含第一个元素?

First element sometimes not included in the second argument of Flux.switchOnFirst?

我正在尝试使用反应器中的 SwitchOnFirst 运算符,这太棒了 - 除了有时作为 BiFunction 的第二个参数传递的转换器似乎不包含第一个元素。 基本上,客户端通过 RSocket 向服务器发送 2 个项目。 代码服务器端如下所示:

val socket = new AbstractRSocket() {

    override def requestChannel(payloads: Publisher[Payload]): Flux[Payload] =
      Flux.from(payloads).log.switchOnFirst((signal, all) => handle(signal.get(), all))

    private def handle(first: Payload, all: Flux[Payload]): Flux[Payload] =
      extractRoute(first) match {
        case Some("test.route") =>
          val source = Source.fromPublisher(all.log()).map(_.getDataUtf8)
          actorSink.runWith(source)
          return Flux.from(actorSource).map(DefaultPayload.create).runWith(Sink.asPublisher(false)))
      }

  }

客户端第一次启动时,服务器会收到这两个项目并按预期将它们发布到 actorsink

[2020-02-01 16:17:42,656] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onSubscribe([Fuseable] FluxDoFinally.DoFinallyFuseableSubscriber) {}
[2020-02-01 16:17:42,658] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | request(1) {}
[2020-02-01 16:17:42,664] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onNext(io.rsocket.util.ByteBufPayload@53e655e6) {}
[2020-02-01 16:17:42,731] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - onSubscribe(FluxSwitchOnFirst.SwitchOnFirstInner) {}
[2020-02-01 16:17:42,736] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - request(16) {}
[2020-02-01 16:17:42,739] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - onNext(io.rsocket.util.ByteBufPayload@53e655e6) {}
[2020-02-01 16:17:42,741] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [akka.actor.default-dispatcher-9] - | request(15) {}
[Sink] Received (item1)
[2020-02-01 16:17:42,769] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onNext(io.rsocket.util.ByteBufPayload@50215db3) {}
[2020-02-01 16:17:42,769] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [reactor-tcp-epoll-2] - onNext(io.rsocket.util.ByteBufPayload@50215db3) {}
[Sink] Received (item2)
[2020-02-01 16:17:42,770] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onComplete() {}
[2020-02-01 16:17:42,771] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [reactor-tcp-epoll-2] - onComplete() {}
[Sink] Completed

但是,如果我停止客户端并再次 运行,则只会发布第二个项目。

[2020-02-01 16:18:13,746] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onSubscribe([Fuseable] FluxDoFinally.DoFinallyFuseableSubscriber) {}
[2020-02-01 16:18:13,746] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | request(1) {}
[2020-02-01 16:18:13,747] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onNext(io.rsocket.util.ByteBufPayload@5a2d7823) {}
[2020-02-01 16:18:13,751] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [akka.actor.default-dispatcher-6] - onSubscribe(FluxSwitchOnFirst.SwitchOnFirstInner) {}
[2020-02-01 16:18:13,752] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [akka.actor.default-dispatcher-6] - request(16) {}
[2020-02-01 16:18:13,752] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [akka.actor.default-dispatcher-6] - | request(16) {}
[2020-02-01 16:18:13,787] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onNext(io.rsocket.util.ByteBufPayload@1fa7bb46) {}
[2020-02-01 16:18:13,788] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [reactor-tcp-epoll-3] - onNext(io.rsocket.util.ByteBufPayload@1fa7bb46) {}
[Sink] Received (item2)
[2020-02-01 16:18:13,790] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onComplete() {}

一个区别是 [SwitchOnFirstInner.4] request(16) 触发 [DoFinallyFuseable.3] request(16),而不是 onNext 使用 SwitchOnFirst 中已经可用的第一个项目运算符。

我可能做错了什么,但不知道是什么。 switchOnFirst 的 javadoc 声明在所有情况下都应返回从原始 Flux 派生的发布者,但这里不是这种情况(输入被发送到 ActorSink,输出来自单独的 ActorSource),可能是问题吗?

我是 reactor / rsocket 的新手,如果我遗漏了一些明显的东西,我深表歉意。

没有从 all Flux 中导出 return Flux 可能确实是原因,特别是如果 actorSink.runWith(source) 请求 all 来源,因为 switchOnFirst 操作员应该处理请求。