Spring Reactor:每个通量元素的上下文

Spring Reactor: Context for each flux element

我有一个基于拉取的流数据源(就像 Kafka)。我想在这个事件处理应用程序上应用反应器。

目前,我使用 EmitterProcessor 创建了无限的事件序列。并且它在初始时间订阅一次并且从未取消。

下面的代码说明了我所做的。

public void initialize(){
    EmitterProcessor<Event> emitter = ...
    emitter.flatmap(this::step1)
           .flatmap(this::step2)
           .flatmap(this::finalStep)
         //.subscriberContext(...)
           .subscribe()
}

对于初始 Flux<Event> 中的每个事件,我需要 maintain/update 一个上下文,以便我可以获得每个步骤的所有输入和结果,并在最后一步中做一些报告。

一步步传递不可变的 Context class 是一种选择,但这将导致所有 step() 都有一个额外的参数。并不是所有 step() 都会使用 Context。在那种情况下,你只是通过 Context 和 return Pair<Context,OtherResult> 看起来很难看。 Pair 也很丑。

所以我更喜欢ThreadLocal<Context>。显然在 reactor 中替换是 subscriberContext()。但是根据我的代码,initialize() 将被调用一次。 Flux<Event> 将是 subscribe() 一次。 subscriberContext 不是我的 Event 级别,而是订阅级别。所以我的代码中只有一个上下文。没用。

问题是我应该将事件流视为一个 Flux<Event> 还是多个 Mono<Event> 并订阅每个事件?如果Mono<Event>是最佳实践,那么我可以直接使用subscriberContext()。但是是否有任何 assemble 时间开销(assemble 每个事件的到来)?

reactor-kafka中,它使每批Record成为一个Flux<Record>,它如何实现像记录级上下文这样的东西?

谢谢。

根据您最后一次需要此上下文中的信息的时间,您可以选择使用单个 flatMap 为每个事件创建范围并为它们分配自己的上下文:

public void initialize(){
    EmitterProcessor<Event> emitter = ...
    emitter.flatMap(eventForScope ->
        Mono.just(eventForScope)
            .flatmap(this::step1)
            .flatmap(this::step2)
            .flatmap(this::finalStep)
            .subscriberContext(...) //context for ONE event
        )
        .subscribe()
}

这可以进行调整,一些后期步骤可能不再需要每个事件的上下文,因此您可以将它们移到外部 flatMap 之外,等等...

这是可行的,因为 flatMap 的内部可以看到 "main" Context,但是内部上下文的更改对于外部/主序列是不可见的。