在 Project Reactor 中调用订阅的消费者参数是顺序的吗?
Are calls to subscribe's consumer argument sequential in Project Reactor?
使用以下代码:
flux.subscribe(consumer)
对 consumer
的调用可能发生在不同的线程上,这取决于 flux
的构造方式(例如,使用 subscribeOn
或 publishOn
)。是否可以保证即使对 consumer
的调用可能发生在不同的线程上,调用也是顺序的,即每次调用在下一次调用开始之前完成?
下面是一个更具体的例子(使用 Reactor-Kafka):
val resultFlux: Flux<Pair<TopicPartition, Long>> = KafkaReceiver
.create<K, V>(receiverOptions)
.receive()
.groupBy { m -> m.receiverOffset().topicPartition() }
.flatMap { partitionFlux ->
val parallelRoFlux = partitionFlux
.publishOn(scheduler)
.flatMapSequential(::processRecord, parallelism)
parallelRoFlux.map { ro ->
acknowledge(ro)
Pair(ro.topicPartition(), ro.offset())
}
}
resultFlux.doOnNext { Thread.sleep(2000); log.info("doOnNext: $it") }
.subscribe { Thread.sleep(1000); log.info("subscribe: $it") }
生成以下输出片段:
13:44:26.401 [elastic-6] INFO consumerSvcFlow - Message_5>>>processed
13:44:28.402 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 15)
13:44:29.402 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 15)
13:44:29.435 [elastic-8] INFO consumerSvcFlow - Message_8>>>processed
13:44:31.435 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 16)
13:44:32.436 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 16)
13:44:32.461 [elastic-6] INFO consumerSvcFlow - Message_9>>>processed
13:44:34.462 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 17)
13:44:35.462 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 17)
13:44:35.494 [elastic-8] INFO consumerSvcFlow - Message_15>>>processed
13:44:37.494 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 18)
13:44:38.495 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 18)
13:44:38.497 [elastic-6] INFO consumerSvcFlow - Message_18>>>processed
13:44:40.498 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 19)
13:44:41.499 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 19)
13:44:41.539 [elastic-8] INFO consumerSvcFlow - Message_19>>>processed
13:44:43.540 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 20)
13:44:44.540 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 20)
对 subscribe
消费者参数的调用是顺序的,但有些调用在线程 [elastic-6] 上,有些在线程 [elastic-8] 上。
是的,根据 Reactive Streams 规范,有这样的保证。
首先,调用可能发生在与您调用 subscribe()
的线程不同的线程 上。但是所有 consumer 调用都发生在同一个线程上。
其次,subscribe(Consumer<T>)
方法中的值消费者实际上被视为 Subscriber
中的 onNext
信号,因此规范强制要求此类调用相对于彼此进行序列化,并且onComplete
和 onError
信号。
编辑:现在您已经添加了一些片段,其中有 2 个线程的事实来自在 flatMap
中完成的 publishOn
。因此,groupBy
的每一组都可以从 Scheduler
中选择一个不同的 Worker
(如果有很多)。因此,可以并行执行在这些内部序列中完成的处理。 结果 然而,当被 flatMap
合并时,是序列化的 => subscribe(Consumer)
是连续的。
使用以下代码:
flux.subscribe(consumer)
对 consumer
的调用可能发生在不同的线程上,这取决于 flux
的构造方式(例如,使用 subscribeOn
或 publishOn
)。是否可以保证即使对 consumer
的调用可能发生在不同的线程上,调用也是顺序的,即每次调用在下一次调用开始之前完成?
下面是一个更具体的例子(使用 Reactor-Kafka):
val resultFlux: Flux<Pair<TopicPartition, Long>> = KafkaReceiver
.create<K, V>(receiverOptions)
.receive()
.groupBy { m -> m.receiverOffset().topicPartition() }
.flatMap { partitionFlux ->
val parallelRoFlux = partitionFlux
.publishOn(scheduler)
.flatMapSequential(::processRecord, parallelism)
parallelRoFlux.map { ro ->
acknowledge(ro)
Pair(ro.topicPartition(), ro.offset())
}
}
resultFlux.doOnNext { Thread.sleep(2000); log.info("doOnNext: $it") }
.subscribe { Thread.sleep(1000); log.info("subscribe: $it") }
生成以下输出片段:
13:44:26.401 [elastic-6] INFO consumerSvcFlow - Message_5>>>processed
13:44:28.402 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 15)
13:44:29.402 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 15)
13:44:29.435 [elastic-8] INFO consumerSvcFlow - Message_8>>>processed
13:44:31.435 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 16)
13:44:32.436 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 16)
13:44:32.461 [elastic-6] INFO consumerSvcFlow - Message_9>>>processed
13:44:34.462 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 17)
13:44:35.462 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 17)
13:44:35.494 [elastic-8] INFO consumerSvcFlow - Message_15>>>processed
13:44:37.494 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 18)
13:44:38.495 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 18)
13:44:38.497 [elastic-6] INFO consumerSvcFlow - Message_18>>>processed
13:44:40.498 [elastic-6] INFO consumerExecutable - doOnNext: (demo-topic-0, 19)
13:44:41.499 [elastic-6] INFO consumerExecutable - subscribe: (demo-topic-0, 19)
13:44:41.539 [elastic-8] INFO consumerSvcFlow - Message_19>>>processed
13:44:43.540 [elastic-8] INFO consumerExecutable - doOnNext: (demo-topic-0, 20)
13:44:44.540 [elastic-8] INFO consumerExecutable - subscribe: (demo-topic-0, 20)
对 subscribe
消费者参数的调用是顺序的,但有些调用在线程 [elastic-6] 上,有些在线程 [elastic-8] 上。
是的,根据 Reactive Streams 规范,有这样的保证。
首先,调用可能发生在与您调用 subscribe()
的线程不同的线程 上。但是所有 consumer 调用都发生在同一个线程上。
其次,subscribe(Consumer<T>)
方法中的值消费者实际上被视为 Subscriber
中的 onNext
信号,因此规范强制要求此类调用相对于彼此进行序列化,并且onComplete
和 onError
信号。
编辑:现在您已经添加了一些片段,其中有 2 个线程的事实来自在 flatMap
中完成的 publishOn
。因此,groupBy
的每一组都可以从 Scheduler
中选择一个不同的 Worker
(如果有很多)。因此,可以并行执行在这些内部序列中完成的处理。 结果 然而,当被 flatMap
合并时,是序列化的 => subscribe(Consumer)
是连续的。