如何在 Reactor 中使用 Context 和 flatMap()?
How to use Context with flatMap() in Reactor?
我在理解 Context 时遇到问题。所以文档说 Context 是:
A key/value store that is propagated between components such as
operators via the context protocol. Contexts are ideal to transport
orthogonal information such as tracing or security tokens.
太棒了。
现在让我们假设我们想要使用 Context 传播一些东西,让它无处不在。要调用另一个异步代码,我们只需使用 flatMap() 方法。
问题:如何在调用的方法中访问上下文?
示例(简单)代码:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.just("Hello " + name + " !!!"); // ALERT: we don't have Context here
}
}
被调用的方法可以(很可能会)在另一个 class。
提前感谢您的帮助!
编辑:删除了一些代码,使问题更加简洁明了。
将你的 Publisher
串起来,愿 Context
与你同在
在这种情况下,您连接了所有 Publisher
(这包括 flatMap
/concatMap
和类似运算符中的连接),您将正确地拥有 Context
在整个流运行时传播。
要在 nameToGreeting
方法中访问 Context
,如果方法似乎不相关,您可以调用 Mono.subscribeContext
并检索存储的信息事件。
下图显示了提到的概念:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.subscriberContext()
.filter(c -> c.hasKey("greetingWord"))
.map(c -> c.get("greetingWord"))
.flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
}
}
此外,您可以使用 zip
运算符按以下方式执行相同的操作,以便稍后合并结果:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.zip(
Mono.subscriberContext()
.filter(c -> c.hasKey("greetingWord"))
.map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
Mono.just(name),
(greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
);
}
}
那么,它为什么有效?
正如我们从上面的示例中看到的,nameToGreeting
是在主 Flux
的上下文中调用的。在引擎盖下 -> (Here Some FluxFlatMap internals), each mapped Publisher
is subscribed by FlatMapInner
. If we look at the FlatMapInner
and look for the currentContext
override 我们将看到,FlatMapInner
使用父级 Context
,这意味着如果父级有一个 Reactor Context
- 那么这个上下文将传播到每个内部 Publisher
.
因此,nameToGreeting
方法返回的 Mono
将与其父
具有相同的 Context
引入 Reactor-Core v3.4 Mono.deferContextual and Flux.deferContextual, which supersede Mono.deferWithContext and Flux.deferWithContext 在 v3.3 中引入。
使用这些方法,可以简化为
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello")); // context initialized
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.deferContextual(c -> Mono.just(name)
.filter(x -> c.hasKey("greetingWord"))
.map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
}
}
我在理解 Context 时遇到问题。所以文档说 Context 是:
A key/value store that is propagated between components such as operators via the context protocol. Contexts are ideal to transport orthogonal information such as tracing or security tokens.
太棒了。
现在让我们假设我们想要使用 Context 传播一些东西,让它无处不在。要调用另一个异步代码,我们只需使用 flatMap() 方法。
问题:如何在调用的方法中访问上下文?
示例(简单)代码:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.just("Hello " + name + " !!!"); // ALERT: we don't have Context here
}
}
被调用的方法可以(很可能会)在另一个 class。
提前感谢您的帮助!
编辑:删除了一些代码,使问题更加简洁明了。
将你的 Publisher
串起来,愿 Context
与你同在
在这种情况下,您连接了所有 Publisher
(这包括 flatMap
/concatMap
和类似运算符中的连接),您将正确地拥有 Context
在整个流运行时传播。
要在 nameToGreeting
方法中访问 Context
,如果方法似乎不相关,您可以调用 Mono.subscribeContext
并检索存储的信息事件。
下图显示了提到的概念:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.subscriberContext()
.filter(c -> c.hasKey("greetingWord"))
.map(c -> c.get("greetingWord"))
.flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
}
}
此外,您可以使用 zip
运算符按以下方式执行相同的操作,以便稍后合并结果:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.zip(
Mono.subscriberContext()
.filter(c -> c.hasKey("greetingWord"))
.map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
Mono.just(name),
(greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
);
}
}
那么,它为什么有效?
正如我们从上面的示例中看到的,nameToGreeting
是在主 Flux
的上下文中调用的。在引擎盖下 -> (Here Some FluxFlatMap internals), each mapped Publisher
is subscribed by FlatMapInner
. If we look at the FlatMapInner
and look for the currentContext
override 我们将看到,FlatMapInner
使用父级 Context
,这意味着如果父级有一个 Reactor Context
- 那么这个上下文将传播到每个内部 Publisher
.
因此,nameToGreeting
方法返回的 Mono
将与其父
Context
引入 Reactor-Core v3.4 Mono.deferContextual and Flux.deferContextual, which supersede Mono.deferWithContext and Flux.deferWithContext 在 v3.3 中引入。
使用这些方法,
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello")); // context initialized
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.deferContextual(c -> Mono.just(name)
.filter(x -> c.hasKey("greetingWord"))
.map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
}
}