Flux.create 和 Flux.generate 之间的区别
Difference Between Flux.create and Flux.generate
Flux.create
and Flux.generate
有什么区别?我正在寻找——最好是通过一个示例用例——来了解我什么时候应该使用一个或另一个。
简而言之:
Flux::create
doesn't react to changes in the state of the app while Flux::generate
does.
长版
Flux::create
当您想计算不受应用程序状态和管道状态影响的多个(0...无穷大)值时,您将使用它(你的管道 ==在Flux::create
之后的操作链==下游) .
为什么?因为您发送给 Flux::create
的方法一直在计算元素(或 none)。 下游 将确定它想要多少元素(元素== 下一个信号),如果他跟不上,那些已经存在的元素emitted 将是 removed/buffered 在某些策略中(默认情况下,它们将被缓冲,直到 downstream 将要求更多)。
第一个也是最简单的用例是发射值,从理论上讲,您可以将这些值相加到一个集合中,然后才获取每个元素并对其进行处理:
Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
/* get all the latest article from a server and emit them one by one to downstream. */
List<String> articals = getArticalsFromServer();
articals.forEach(sink::next);
});
如您所见,Flux.create
用于阻塞方法(getArticalsFromServer
)与异步代码之间的交互。
我确定 Flux.create
.
还有其他用例
Flux::generate
Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
synchronousSink.next(1);
})
.doOnNext(number -> System.out.println(number))
.doOnNext(number -> System.out.println(number + 4))
.subscribe();
输出将是1 5 1 5 1 5................forever
在您发送给Flux::generate
的方法的每次调用中,synchronousSink
只能发出:onSubscribe onNext? (onError | onComplete)?
.
这意味着 Flux::generate
将 计算并发出 值 按需 。你应该什么时候使用它?如果计算可能不使用的元素成本太高 下游 或者您发出的事件受应用程序状态或 状态的影响pipeline(你的pipeline == after Flux::create
== 的操作链下游).
例如,如果您正在构建一个 Torrent 应用程序,那么您将实时接收数据块。您可以使用 Flux::generate
将任务(要下载的块)分配给多个线程,并且仅当某些线程询问时,您才会计算要在 Flux::generate
中下载的块。所以你只会发出你没有的块。使用 Flux::create
的相同算法将失败,因为 Flux::create
将发出我们没有的所有块,如果某些块无法下载,那么我们就有问题了。因为 Flux::create
不会对应用程序状态的变化做出反应,而 Flux::generate
会。
创建:
- 接受
Consumer<FluxSink<T>>
- 每个订阅者只调用一次消费者
- 消费者可以立即发出 0..N 个元素
- 发布者不知道下游状态。所以我们需要提供溢出策略作为附加参数
- 我们可以获得 FluxSink 的引用,使用它我们可以在需要时使用多线程继续发射元素。
生成:
- 接受
Consumer<SynchronousSink<T>>
- Consumer根据下游需求一次又一次被调用
- 使用可选的 complete/error 信号,消费者最多只能发射一个元素。
- 发布者根据下游需求生产元素
- 我们可以得到SynchronousSink的引用。但它可能不是很有用,因为我们只能发射一个元素
查看此 blog 了解更多详情。
Flux.create
and Flux.generate
有什么区别?我正在寻找——最好是通过一个示例用例——来了解我什么时候应该使用一个或另一个。
简而言之:
Flux::create
doesn't react to changes in the state of the app whileFlux::generate
does.
长版
Flux::create
当您想计算不受应用程序状态和管道状态影响的多个(0...无穷大)值时,您将使用它(你的管道 ==在Flux::create
之后的操作链==下游) .
为什么?因为您发送给 Flux::create
的方法一直在计算元素(或 none)。 下游 将确定它想要多少元素(元素== 下一个信号),如果他跟不上,那些已经存在的元素emitted 将是 removed/buffered 在某些策略中(默认情况下,它们将被缓冲,直到 downstream 将要求更多)。
第一个也是最简单的用例是发射值,从理论上讲,您可以将这些值相加到一个集合中,然后才获取每个元素并对其进行处理:
Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
/* get all the latest article from a server and emit them one by one to downstream. */
List<String> articals = getArticalsFromServer();
articals.forEach(sink::next);
});
如您所见,Flux.create
用于阻塞方法(getArticalsFromServer
)与异步代码之间的交互。
我确定 Flux.create
.
Flux::generate
Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
synchronousSink.next(1);
})
.doOnNext(number -> System.out.println(number))
.doOnNext(number -> System.out.println(number + 4))
.subscribe();
输出将是1 5 1 5 1 5................forever
在您发送给Flux::generate
的方法的每次调用中,synchronousSink
只能发出:onSubscribe onNext? (onError | onComplete)?
.
这意味着 Flux::generate
将 计算并发出 值 按需 。你应该什么时候使用它?如果计算可能不使用的元素成本太高 下游 或者您发出的事件受应用程序状态或 状态的影响pipeline(你的pipeline == after Flux::create
== 的操作链下游).
例如,如果您正在构建一个 Torrent 应用程序,那么您将实时接收数据块。您可以使用 Flux::generate
将任务(要下载的块)分配给多个线程,并且仅当某些线程询问时,您才会计算要在 Flux::generate
中下载的块。所以你只会发出你没有的块。使用 Flux::create
的相同算法将失败,因为 Flux::create
将发出我们没有的所有块,如果某些块无法下载,那么我们就有问题了。因为 Flux::create
不会对应用程序状态的变化做出反应,而 Flux::generate
会。
创建:
- 接受
Consumer<FluxSink<T>>
- 每个订阅者只调用一次消费者
- 消费者可以立即发出 0..N 个元素
- 发布者不知道下游状态。所以我们需要提供溢出策略作为附加参数
- 我们可以获得 FluxSink 的引用,使用它我们可以在需要时使用多线程继续发射元素。
生成:
- 接受
Consumer<SynchronousSink<T>>
- Consumer根据下游需求一次又一次被调用
- 使用可选的 complete/error 信号,消费者最多只能发射一个元素。
- 发布者根据下游需求生产元素
- 我们可以得到SynchronousSink的引用。但它可能不是很有用,因为我们只能发射一个元素
查看此 blog 了解更多详情。