如何取消正在进行的 Spring Flux?
How to cancel an ongoing Spring Flux?
我正在使用 spring flux 向服务发送并行请求,这是它的非常简化的版本:
Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
} ...
我想知道如何取消这种通量,例如,以某种方式获取对通量的引用并告诉它关闭。
您可能知道,对于反应对象,all operators are lazy。这意味着管道的执行会延迟到您订阅反应流的那一刻。
因此,在您的示例中,还没有要取消的内容,因为此时没有任何事情发生。
但假设您的示例扩展为:
Disposable disp = Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
}
.subscribe();
然后,如您所见,您的订阅 returns 一个 Disposable
对象,您可以根据需要使用该对象取消整个订阅,例如
disp.dispose()
dispose 的文档说:
Cancel or dispose the underlying task or resource.
another section of the documentation 是这样说的:
These variants [of operators] return a reference to the subscription
that you can use to cancel the subscription when no more data is
needed. Upon cancellation, the source should stop producing values and
clean up any resources it created. This cancel and clean-up behavior
is represented in Reactor by the general-purpose Disposable
interface.
因此,取消流的执行在反应对象方面并非没有复杂性,因为如果在处理过程中取消流,您希望确保让世界处于一致的状态。例如,如果您正在构建某些东西,您可能想要丢弃资源、销毁任何部分聚合结果、关闭文件、通道、释放内存或您拥有的任何其他资源,可能会撤消更改或补偿它们。
您可能想阅读有关 cleanup 的文档,这样您也可以考虑在反应对象方面可以做什么。
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
@Edwin 的回答很准确。只要你不调用订阅,就没有什么可以取消的,因为不会执行任何代码。
只是想添加一个例子来说明。
public static void main(String[] args) throws InterruptedException {
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Disposable disposable = Flux.fromIterable(lists)
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
disposable.dispose();
Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}
但我想说一种更简洁的方法(函数式方法)是使用像 takeUntil
或 take
这样的函数。例如,我也可以像这样停止上面示例中的流。
List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
或
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
.delayElements(Duration.ofSeconds(2))
.map(String::toLowerCase)
.subscribe(System.out::println);
另一个 subscribe
我的通量然后调用 dispose
为我做了:
// Setup flux and populate
Flux<String> myFlux = controller.get(json);
// Subscribe
FlowSubscriber<String> sub = new FlowSubscriber<String>();
myFlux.subscribe(sub);
// Work on elements in the subscription
String myString = sub.consumedElements.get(0);
... do work ...
// Cancel
myFlux.subscribe().dispose();
我正在使用 spring flux 向服务发送并行请求,这是它的非常简化的版本:
Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
} ...
我想知道如何取消这种通量,例如,以某种方式获取对通量的引用并告诉它关闭。
您可能知道,对于反应对象,all operators are lazy。这意味着管道的执行会延迟到您订阅反应流的那一刻。
因此,在您的示例中,还没有要取消的内容,因为此时没有任何事情发生。
但假设您的示例扩展为:
Disposable disp = Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
}
.subscribe();
然后,如您所见,您的订阅 returns 一个 Disposable
对象,您可以根据需要使用该对象取消整个订阅,例如
disp.dispose()
dispose 的文档说:
Cancel or dispose the underlying task or resource.
another section of the documentation 是这样说的:
These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose
Disposable
interface.
因此,取消流的执行在反应对象方面并非没有复杂性,因为如果在处理过程中取消流,您希望确保让世界处于一致的状态。例如,如果您正在构建某些东西,您可能想要丢弃资源、销毁任何部分聚合结果、关闭文件、通道、释放内存或您拥有的任何其他资源,可能会撤消更改或补偿它们。
您可能想阅读有关 cleanup 的文档,这样您也可以考虑在反应对象方面可以做什么。
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
@Edwin 的回答很准确。只要你不调用订阅,就没有什么可以取消的,因为不会执行任何代码。
只是想添加一个例子来说明。
public static void main(String[] args) throws InterruptedException {
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Disposable disposable = Flux.fromIterable(lists)
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
disposable.dispose();
Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}
但我想说一种更简洁的方法(函数式方法)是使用像 takeUntil
或 take
这样的函数。例如,我也可以像这样停止上面示例中的流。
List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
或
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
.delayElements(Duration.ofSeconds(2))
.map(String::toLowerCase)
.subscribe(System.out::println);
另一个 subscribe
我的通量然后调用 dispose
为我做了:
// Setup flux and populate
Flux<String> myFlux = controller.get(json);
// Subscribe
FlowSubscriber<String> sub = new FlowSubscriber<String>();
myFlux.subscribe(sub);
// Work on elements in the subscription
String myString = sub.consumedElements.get(0);
... do work ...
// Cancel
myFlux.subscribe().dispose();