如何在 Reactor 中使用 Context 和 flatMap()?

How to use Context with flatMap() in Reactor?

我在理解 Context 时遇到问题。所以文档说 Context 是:

A key/value store that is propagated between components such as operators via the context protocol. Contexts are ideal to transport orthogonal information such as tracing or security tokens.

太棒了。

现在让我们假设我们想要使用 Context 传播一些东西,让它无处不在。要调用另一个异步代码,我们只需使用 flatMap() 方法。

问题如何在调用的方法中访问上下文?

示例(简单)代码:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
            .flatMap(TestFlatMap::nameToGreeting)
            .subscriberContext(context ->
                Context.of("greetingWord", "Hello")  // context initialized
            );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.just("Hello " + name + " !!!");  // ALERT: we don't have Context here
    }
}

被调用的方法可以(很可能会)在另一个 class。

提前感谢您的帮助!

编辑:删除了一些代码,使问题更加简洁明了。

将你的 Publisher 串起来,愿 Context 与你同在

在这种情况下,您连接了所有 Publisher(这包括 flatMap/concatMap 和类似运算符中的连接),您将正确地拥有 Context在整个流运行时传播。

要在 nameToGreeting 方法中访问 Context,如果方法似乎不相关,您可以调用 Mono.subscribeContext 并检索存储的信息事件。 下图显示了提到的概念:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.subscriberContext()
                   .filter(c -> c.hasKey("greetingWord"))
                   .map(c -> c.get("greetingWord"))
                   .flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
    }
}

此外,您可以使用 zip 运算符按以下方式执行相同的操作,以便稍后合并结果:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.zip(
            Mono.subscriberContext()
                .filter(c -> c.hasKey("greetingWord"))
                .map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
            Mono.just(name),
            (greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
        );
    }
}

那么,它为什么有效?

正如我们从上面的示例中看到的,nameToGreeting 是在主 Flux 的上下文中调用的。在引擎盖下 -> (Here Some FluxFlatMap internals), each mapped Publisher is subscribed by FlatMapInner. If we look at the FlatMapInner and look for the currentContext override 我们将看到,FlatMapInner 使用父级 Context,这意味着如果父级有一个 Reactor Context - 那么这个上下文将传播到每个内部 Publisher.

因此,nameToGreeting 方法返回的 Mono 将与其父

具有相同的 Context

引入 Reactor-Core v3.4 Mono.deferContextual and Flux.deferContextual, which supersede Mono.deferWithContext and Flux.deferWithContext 在 v3.3 中引入。

使用这些方法,可以简化为

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                .flatMap(TestFlatMap::nameToGreeting)
                .subscriberContext(context ->
                        Context.of("greetingWord", "Hello"));  // context initialized
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.deferContextual(c -> Mono.just(name)
                .filter(x -> c.hasKey("greetingWord"))
                .map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
    }
}