如何不依赖 Reactor 中的订阅时间

How not to depend on subscription time in Reactor

我一直在通读 Reactor 文档,但我无法找到以下问题的正确模式。 我有一个应该异步执行某些操作的方法。我 returns 以 Flux 的形式响应结果,消费者可以订阅它。

方法定义如下:

Flux<ResultMessage> sendRequest(RequestMessage message);

返回的通量是一个热通量,结果可以在任何给定时间异步出现。

潜在消费者可以通过以下方式使用它:

sendRequest(message).subscribe(response->doSomethinWithResponse(response);

一个实现可以是这样的:

Flux<ResultMessage> sendRequest(RequestMessage message) {
   Flux<ResultMessage> result = incomingMessageStream
            .filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
            .take( 2 );
// The message sending is done here...

    return result;
}

其中 incomingMessageStream 是通过此频道的所有消息的 Flux。 这个实现的问题是消费者在结果消息到来之后被订阅,它可能会错过其中的一些。

所以,我正在寻找一种解决方案,让消费者不依赖于订阅时间。可能根本不需要潜在消费者订阅结果 Flux。我正在寻找通用解决方案,但如果不可能,您可以假设生成的消息数不大于 2。

一段时间后,我创建了一个似乎有效的解决方案:

Flux<ResultMessage> sendRequest(RequestMessage message) {
  final int maxResponsesCount = 2;
  final Duration responseTimeout = Duration.ofSeconds( 10 );
  final Duration subscriptionTimeout = Duration.ofSeconds( 5 );

  // (1) 
  ConnectableFlux<ResultMessage> result = incomingMessageStream
      .ofType( ResultMessage.class )
      .filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
      .take( maxResponsesCount )
      .timeout( responseTimeout )
      .replay( maxResponsesCount );
  Disposable connectionDisposable = result.connect();

  // (2)
  AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
  Mono.delay( subscriptionTimeout )
    .doOnSubscribe( subscriptionForCancelSubscription::set )
    .subscribe( x -> connectionDisposable.dispose() );

  // The message sending is done here...

  // (3)
  return result
    .doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
    .doFinally( signalType -> connectionDisposable.dispose() );
}

我正在使用一个 ConnectableFlux,它可以立即连接到流,无需订阅,它设置为使用 reply() 方法来存储所有消息,因此稍后的任何订阅者都不会错过响应消息 (1)。

可以执行的路径很少:

  1. 方法已调用,但未对通量执行任何订阅
    • 解决方案 - 如果没有完成订阅,有一个计时器会在 5 秒后删除连接的通量资源。 (2)
  2. 方法被调用并订阅了flux

    2.1。没有返回消息

    • 解决方案 - 设置了获取响应的超时时间 (.timeout( responseTimeout ))。之后 .doFinally(..) 清理资源 (1)(3).

    2.2。部分回复信息已返回

    • 解决方案 - 与 2.1 相同。

    2.3。已返回所有响应消息

    • 解决方案 - 由于达到最大元素数 (.take( maxResponsesCount )) 而执行 doFinally() (1)(3)

我还没有对此进行一些认真的测试,如果出现问题,我会在这个答案中添加更正。