Flux.create 和 Flux.generate 之间的区别

Difference Between Flux.create and Flux.generate

Flux.create and Flux.generate有什么区别?我正在寻找——最好是通过一个示例用例——来了解我什么时候应该使用一个或另一个。

简而言之:

Flux::create doesn't react to changes in the state of the app while Flux::generate does.


长版

Flux::create

当您想计算不受应用程序状态和管道状态影响的多个(0...无穷大)值时,您将使用它(你的管道 ==Flux::create之后的操作链==下游) .

为什么?因为您发送给 Flux::create 的方法一直在计算元素(或 none)。 下游 将确定它想要多少元素(元素== 下一个信号),如果他跟不上,那些已经存在的元素emitted 将是 removed/buffered 在某些策略中(默认情况下,它们将被缓冲,直到 downstream 将要求更多)。

第一个也是最简单的用例是发射值,从理论上讲,您可以将这些值相加到一个集合中,然后才获取每个元素并对其进行处理:

Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
/* get all the latest article from a server and emit them one by one to downstream. */
List<String> articals = getArticalsFromServer();
articals.forEach(sink::next);
});

如您所见,Flux.create用于阻塞方法(getArticalsFromServer)与异步代码之间的交互。

我确定 Flux.create.

还有其他用例

Flux::generate

Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
        synchronousSink.next(1);
    })
    .doOnNext(number -> System.out.println(number))
    .doOnNext(number -> System.out.println(number + 4))
    .subscribe();

输出将是1 5 1 5 1 5................forever

在您发送给Flux::generate的方法的每次调用中,synchronousSink只能发出:onSubscribe onNext? (onError | onComplete)?.

这意味着 Flux::generate 计算并发出 按需 。你应该什么时候使用它?如果计算可能不使用的元素成本太高 下游 或者您发出的事件受应用程序状态或 状态的影响pipeline(你的pipeline == after Flux::create == 的操作链下游).

例如,如果您正在构建一个 Torrent 应用程序,那么您将实时接收数据块。您可以使用 Flux::generate 将任务(要下载的块)分配给多个线程,并且仅当某些线程询问时,您才会计算要在 Flux::generate 中下载的块。所以你只会发出你没有的块。使用 Flux::create 的相同算法将失败,因为 Flux::create 将发出我们没有的所有块,如果某些块无法下载,那么我们就有问题了。因为 Flux::create 不会对应用程序状态的变化做出反应,而 Flux::generate 会。

创建:

  • 接受 Consumer<FluxSink<T>>
  • 每个订阅者只调用一次消费者
  • 消费者可以立即发出 0..N 个元素
  • 发布者不知道下游状态。所以我们需要提供溢出策略作为附加参数
  • 我们可以获得 FluxSink 的引用,使用它我们可以在需要时使用多线程继续发射元素。

生成:

  • 接受 Consumer<SynchronousSink<T>>
  • Consumer根据下游需求一次又一次被调用
  • 使用可选的 complete/error 信号,消费者最多只能发射一个元素。
  • 发布者根据下游需求生产元素
  • 我们可以得到SynchronousSink的引用。但它可能不是很有用,因为我们只能发射一个元素

查看此 blog 了解更多详情。