如何使用 Project Reactor design/create 反应式方法

How to design/create reactive method with Project Reactor

我刚刚开始使用基于项目反应器的反应式框架,例如 spring-webflux,对此我有一些疑问。

问题一:

示例 1:

  public Mono<String> doSome(String str){
    String key = str.substring(0, 5).toLowerCase() + str.substring(6);
    return getValueFromRedis(key);
  }

  public Mono<String> getValueFromRedis(String key){
    return reactiveRedisOperations().opsForHash().get("TestCache", key);
  }

示例 2:

  public Mono<String> doSome(String str){
    return Mono.fromCallable(() -> {
      String key = str.substring(0, 5).toLowerCase() + str.substring(6);
      return getValueFromRedis(key);
    }).flatMap(stringMono -> stringMono);
  }

  public Mono<String> getValueFromRedis(String key){
    return reactiveRedisOperations().opsForHash().get("TestCache", key);
  }

两个示例之间有区别还是两者都可以接受。

问题二:

示例 1:

  @PostMapping(value = "/greet")
  public Mono<String> greet(String name) {
    return Mono.fromCallable(() -> aMethod(name));
    // or return Mono.just(aMethod(name));
  }

  public String aMethod(String name){
    return "Hello: " + name;
  }

示例 2:

  @PostMapping(value = "/greet")
  public Mono<String> greet(String name) {
    return aMethod(name);
  }

  public Mono<String> aMethod(String name){
    return Mono.just("Hello: " + name);
  }

我知道第二个问题很奇怪,但我想知道所有方法都应该 return Mono 或 Flux 或者我可以像 Question2/Example1 那样使用吗?

问题1:是的,有区别。在示例 1 中,您在 Mono.fromCallable 之外创建了 String key。这在这里没什么大不了的,但如果它是昂贵的操作,你会减慢一切。

这个逻辑 reactiveRedisOperations().opsForHash() 也是在 Mono.fromCallable 之外执行的。同样的事情 - 如果这很昂贵,你就会放慢一切。

问题2:与问题1相同。Mono.just接受一个常规对象,而不是稍后在需要时调用的对象(如CallableSupplier)。因此,当使用 Mono.just 时,您将立即付出参数初始化的代价。

在你的例子中几乎没有任何区别,但是 MonoFlux 通常用于以异步方式链接昂贵的操作,所以没有任何东西被阻塞,比如数据库调用或对其他外部的调用服务。看看我下面的示例,其中 sleep 是对外部调用的模拟。注意打印语句的顺序。

public String superLongMethod() {
  System.out.println("superLongMethod");
  Thread.sleep(10000);
  return "ok";
}

System.out.println("before");
Mono.just(superLongMethod());
System.out.println("after");

// printed output is - before - superLongMethod - after

-----------------------------------------------------------------

System.out.println("before");
Mono.fromCallable(() -> superLongMethod());
System.out.println("after");

// printed output is - before - after - superLongMethod

-----------------------------------------------------------------

System.out.println("before");
String key = superLongMethod();
System.out.println("after");
return getValueFromRedis(key);

// printed output is - before - superLongMethod - after

-----------------------------------------------------------------

System.out.println("before");
Mono<String> mono = Mono.fromCallable(() -> {
  String key = superLongMethod();
  return getValueFromRedis(key);
}).flatMap(stringMono -> stringMono);
System.out.println("after");
return mono;

// printed output is - before - after - superLongMethod

fromCallable 每次 Mono 有 一个新订阅者 时执行 lambda 而 just 捕获 它的参数在实例化时间并将其发送给每个订阅者。

以下代码说明了区别:

private Instant getTime() {
    final Instant now = Instant.now();
    System.out.println("getTime(): " + now);
    return now;
}

@Test
public void just() throws InterruptedException {
    final Mono<Instant> mono = Mono.just(getTime())
            .doOnNext(instant -> System.out.println(instant));

    Thread.sleep(500);

    mono.subscribe();

    Thread.sleep(500);

    mono.subscribe();

    /* output is
        getTime(): 2019-08-14T22:47:06.823Z
        2019-08-14T22:47:06.823Z
        2019-08-14T22:47:06.823Z
    */
}

@Test
public void fromCallable() throws InterruptedException {
    final Mono<Instant> mono = Mono.fromCallable(() -> getTime())
            .doOnNext(instant -> System.out.println(instant));

    Thread.sleep(500);

    mono.subscribe();

    Thread.sleep(500);

    mono.subscribe();

    /* output is
        getTime(): 2019-08-14T22:47:13.947Z
        2019-08-14T22:47:13.947Z
        getTime(): 2019-08-14T22:47:14.447Z
        2019-08-14T22:47:14.447Z
    */
}

(你也可以看看 ,它是关于 RxJava 的,但它的 API 几乎与 Reactor 相同。

所以你的问题 1 的答案是:是的,有区别。 在示例 1 中调用 doSome 将立即导致从 Redis 获取异步值。在示例 2 中,只有有人订阅了 doSome 方法返回的 Mono,Redis 才会参与。