如何在反应器 webflux 中丰富对象?

How to do enrichment of an object in reactor webflux?

我有接口

    public interface Enricher {
       Mono<MyObject> enrich(MyObject obj);
    }

我有此接口的实现,return更新后的 MyObject。

我有另一个名为 CompositeEnricher 的实现,它在构造函数中接收 Enricher 列表并执行丰富。

在非响应式的世界里,我会写这样的东西

/*
  Please note that following code is applicable only if Enricher interface returns MyObject instead of Mono<MyObject>
*/

public class CompositeEnricher implements Enricher {

  private final List<Enricher> enrichers;

  public CompositeEnricher(List<Enricher> enrichers) {
    this.enrichers = enrichers;
  }

  @Override
  public MyObject enrich(MyObject myObject) {
    MyObject updated = myObject;
    for(Enricher enricher : enrichers) {
      updated = enricher.enrich(updated);
    }
    return updated;
  }
}

如何将其更改为反应式,以便 Enricher return Mono 的每个实现都包括实现 Enricher 接口的 CompositeEnricher?

实际上根本不需要单独的 CompositeEnricher class - 给定一个 List<Enricher> enrichers,您可以使用标准 Java 流简单地减少您的 enrichers:

Enricher composite = enrichers.stream()
        .reduce((e1, e2) -> myObj -> e1.enrich(myObj).flatMap(myObj2 -> e2.enrich(myObj2)))
        .get();

当然,如果需要,您可以将其充实为 class,但您同样可以将其作为辅助方法在某处实现。

我使用 Mono.expand 方法解决了问题。但我不得不使用 enrichers.iterator()。想知道我是否可以以某种方式将 enrichers 也放在反应链中而不是使用迭代器。

@Override
    public Mono<User> enrich(User user) {
        Iterator<Enricher> iterator = enrichers.iterator();
        Function<User, Publisher<User>> enrichmentFunction = u -> iterator.hasNext() ? iterator.next().enrich(u) : Mono.empty();
        return Mono.just(user)
                .expand(enrichmentFunction)
                .last();
    }