如何不依赖 Reactor 中的订阅时间
How not to depend on subscription time in Reactor
我一直在通读 Reactor 文档,但我无法找到以下问题的正确模式。
我有一个应该异步执行某些操作的方法。我 returns 以 Flux 的形式响应结果,消费者可以订阅它。
方法定义如下:
Flux<ResultMessage> sendRequest(RequestMessage message);
返回的通量是一个热通量,结果可以在任何给定时间异步出现。
潜在消费者可以通过以下方式使用它:
sendRequest(message).subscribe(response->doSomethinWithResponse(response);
一个实现可以是这样的:
Flux<ResultMessage> sendRequest(RequestMessage message) {
Flux<ResultMessage> result = incomingMessageStream
.filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
.take( 2 );
// The message sending is done here...
return result;
}
其中 incomingMessageStream
是通过此频道的所有消息的 Flux
。
这个实现的问题是消费者在结果消息到来之后被订阅,它可能会错过其中的一些。
所以,我正在寻找一种解决方案,让消费者不依赖于订阅时间。可能根本不需要潜在消费者订阅结果 Flux
。我正在寻找通用解决方案,但如果不可能,您可以假设生成的消息数不大于 2。
一段时间后,我创建了一个似乎有效的解决方案:
Flux<ResultMessage> sendRequest(RequestMessage message) {
final int maxResponsesCount = 2;
final Duration responseTimeout = Duration.ofSeconds( 10 );
final Duration subscriptionTimeout = Duration.ofSeconds( 5 );
// (1)
ConnectableFlux<ResultMessage> result = incomingMessageStream
.ofType( ResultMessage.class )
.filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
.take( maxResponsesCount )
.timeout( responseTimeout )
.replay( maxResponsesCount );
Disposable connectionDisposable = result.connect();
// (2)
AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
Mono.delay( subscriptionTimeout )
.doOnSubscribe( subscriptionForCancelSubscription::set )
.subscribe( x -> connectionDisposable.dispose() );
// The message sending is done here...
// (3)
return result
.doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
.doFinally( signalType -> connectionDisposable.dispose() );
}
我正在使用一个 ConnectableFlux,它可以立即连接到流,无需订阅,它设置为使用 reply() 方法来存储所有消息,因此稍后的任何订阅者都不会错过响应消息 (1)。
可以执行的路径很少:
- 方法已调用,但未对通量执行任何订阅
- 解决方案 - 如果没有完成订阅,有一个计时器会在 5 秒后删除连接的通量资源。 (2)
方法被调用并订阅了flux
2.1。没有返回消息
- 解决方案 - 设置了获取响应的超时时间 (
.timeout( responseTimeout )
)。之后 .doFinally(..)
清理资源 (1)(3).
2.2。部分回复信息已返回
- 解决方案 - 与 2.1 相同。
2.3。已返回所有响应消息
- 解决方案 - 由于达到最大元素数 (
.take( maxResponsesCount )
) 而执行 doFinally()
(1)(3)
我还没有对此进行一些认真的测试,如果出现问题,我会在这个答案中添加更正。
我一直在通读 Reactor 文档,但我无法找到以下问题的正确模式。 我有一个应该异步执行某些操作的方法。我 returns 以 Flux 的形式响应结果,消费者可以订阅它。
方法定义如下:
Flux<ResultMessage> sendRequest(RequestMessage message);
返回的通量是一个热通量,结果可以在任何给定时间异步出现。
潜在消费者可以通过以下方式使用它:
sendRequest(message).subscribe(response->doSomethinWithResponse(response);
一个实现可以是这样的:
Flux<ResultMessage> sendRequest(RequestMessage message) {
Flux<ResultMessage> result = incomingMessageStream
.filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
.take( 2 );
// The message sending is done here...
return result;
}
其中 incomingMessageStream
是通过此频道的所有消息的 Flux
。
这个实现的问题是消费者在结果消息到来之后被订阅,它可能会错过其中的一些。
所以,我正在寻找一种解决方案,让消费者不依赖于订阅时间。可能根本不需要潜在消费者订阅结果 Flux
。我正在寻找通用解决方案,但如果不可能,您可以假设生成的消息数不大于 2。
一段时间后,我创建了一个似乎有效的解决方案:
Flux<ResultMessage> sendRequest(RequestMessage message) {
final int maxResponsesCount = 2;
final Duration responseTimeout = Duration.ofSeconds( 10 );
final Duration subscriptionTimeout = Duration.ofSeconds( 5 );
// (1)
ConnectableFlux<ResultMessage> result = incomingMessageStream
.ofType( ResultMessage.class )
.filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
.take( maxResponsesCount )
.timeout( responseTimeout )
.replay( maxResponsesCount );
Disposable connectionDisposable = result.connect();
// (2)
AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
Mono.delay( subscriptionTimeout )
.doOnSubscribe( subscriptionForCancelSubscription::set )
.subscribe( x -> connectionDisposable.dispose() );
// The message sending is done here...
// (3)
return result
.doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
.doFinally( signalType -> connectionDisposable.dispose() );
}
我正在使用一个 ConnectableFlux,它可以立即连接到流,无需订阅,它设置为使用 reply() 方法来存储所有消息,因此稍后的任何订阅者都不会错过响应消息 (1)。
可以执行的路径很少:
- 方法已调用,但未对通量执行任何订阅
- 解决方案 - 如果没有完成订阅,有一个计时器会在 5 秒后删除连接的通量资源。 (2)
方法被调用并订阅了flux
2.1。没有返回消息
- 解决方案 - 设置了获取响应的超时时间 (
.timeout( responseTimeout )
)。之后.doFinally(..)
清理资源 (1)(3).
2.2。部分回复信息已返回
- 解决方案 - 与 2.1 相同。
2.3。已返回所有响应消息
- 解决方案 - 由于达到最大元素数 (
.take( maxResponsesCount )
) 而执行doFinally()
(1)(3)
- 解决方案 - 设置了获取响应的超时时间 (
我还没有对此进行一些认真的测试,如果出现问题,我会在这个答案中添加更正。