Spring Reactor:每个通量元素的上下文
Spring Reactor: Context for each flux element
我有一个基于拉取的流数据源(就像 Kafka)。我想在这个事件处理应用程序上应用反应器。
目前,我使用 EmitterProcessor 创建了无限的事件序列。并且它在初始时间订阅一次并且从未取消。
下面的代码说明了我所做的。
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
//.subscriberContext(...)
.subscribe()
}
对于初始 Flux<Event>
中的每个事件,我需要 maintain/update 一个上下文,以便我可以获得每个步骤的所有输入和结果,并在最后一步中做一些报告。
一步步传递不可变的 Context
class 是一种选择,但这将导致所有 step()
都有一个额外的参数。并不是所有 step()
都会使用 Context
。在那种情况下,你只是通过 Context
和 return Pair<Context,OtherResult>
看起来很难看。 Pair
也很丑。
所以我更喜欢ThreadLocal<Context>
。显然在 reactor 中替换是 subscriberContext()
。但是根据我的代码,initialize()
将被调用一次。 Flux<Event>
将是 subscribe()
一次。 subscriberContext
不是我的 Event
级别,而是订阅级别。所以我的代码中只有一个上下文。没用。
问题是我应该将事件流视为一个 Flux<Event>
还是多个 Mono<Event>
并订阅每个事件?如果Mono<Event>
是最佳实践,那么我可以直接使用subscriberContext()
。但是是否有任何 assemble 时间开销(assemble 每个事件的到来)?
在reactor-kafka中,它使每批Record
成为一个Flux<Record>
,它如何实现像记录级上下文这样的东西?
谢谢。
根据您最后一次需要此上下文中的信息的时间,您可以选择使用单个 flatMap
为每个事件创建范围并为它们分配自己的上下文:
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatMap(eventForScope ->
Mono.just(eventForScope)
.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
.subscriberContext(...) //context for ONE event
)
.subscribe()
}
这可以进行调整,一些后期步骤可能不再需要每个事件的上下文,因此您可以将它们移到外部 flatMap
之外,等等...
这是可行的,因为 flatMap
的内部可以看到 "main" Context
,但是内部上下文的更改对于外部/主序列是不可见的。
我有一个基于拉取的流数据源(就像 Kafka)。我想在这个事件处理应用程序上应用反应器。
目前,我使用 EmitterProcessor 创建了无限的事件序列。并且它在初始时间订阅一次并且从未取消。
下面的代码说明了我所做的。
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
//.subscriberContext(...)
.subscribe()
}
对于初始 Flux<Event>
中的每个事件,我需要 maintain/update 一个上下文,以便我可以获得每个步骤的所有输入和结果,并在最后一步中做一些报告。
一步步传递不可变的 Context
class 是一种选择,但这将导致所有 step()
都有一个额外的参数。并不是所有 step()
都会使用 Context
。在那种情况下,你只是通过 Context
和 return Pair<Context,OtherResult>
看起来很难看。 Pair
也很丑。
所以我更喜欢ThreadLocal<Context>
。显然在 reactor 中替换是 subscriberContext()
。但是根据我的代码,initialize()
将被调用一次。 Flux<Event>
将是 subscribe()
一次。 subscriberContext
不是我的 Event
级别,而是订阅级别。所以我的代码中只有一个上下文。没用。
问题是我应该将事件流视为一个 Flux<Event>
还是多个 Mono<Event>
并订阅每个事件?如果Mono<Event>
是最佳实践,那么我可以直接使用subscriberContext()
。但是是否有任何 assemble 时间开销(assemble 每个事件的到来)?
在reactor-kafka中,它使每批Record
成为一个Flux<Record>
,它如何实现像记录级上下文这样的东西?
谢谢。
根据您最后一次需要此上下文中的信息的时间,您可以选择使用单个 flatMap
为每个事件创建范围并为它们分配自己的上下文:
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatMap(eventForScope ->
Mono.just(eventForScope)
.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
.subscriberContext(...) //context for ONE event
)
.subscribe()
}
这可以进行调整,一些后期步骤可能不再需要每个事件的上下文,因此您可以将它们移到外部 flatMap
之外,等等...
这是可行的,因为 flatMap
的内部可以看到 "main" Context
,但是内部上下文的更改对于外部/主序列是不可见的。