使用 repeat() 测试订阅

Testing subscription with repeat()

我想测试一个在延迟单声道上使用 repeat() 运算符并订阅结果的函数。在测试中,我使用 TestPublisher 来模拟来自单声道的新值。

在一个非常简化的形式中,它看起来像这样:

package de.cronos.mad.messages.backend;

import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;

public class RepeatTest {

    private static class TestSubject {
        public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
            Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
        }
    }

    @Test
    public void repeatTest() {
        TestPublisher<String> testPublisher = TestPublisher.create();

        TestSubject testSubject = new TestSubject();
        testSubject.logMonoValues(testPublisher::mono);

        testPublisher.emit("Hello");
        testPublisher.emit("World");
    }
}

在将“Hello”记录到标准输出后执行挂起。我想我理解为什么会发生这种情况:emit(…) 调用从主线程发生并从那里“驱动”订阅。

我不知道如何修改此测试以使其完成,即不挂起?

TestPublisher emit 方法接受值数组。一旦发出,它就会关闭源。所以你不能一一发射。而是像这样传递所有值。

testPublisher.emit("Hello", "world");

// from TestPublisher emit
public final TestPublisher<T> emit(T... values) {
    Objects.requireNonNull(values, "values array is null, please cast to T if null T required");
    Object[] var2 = values;
    int var3 = values.length;

    for(int var4 = 0; var4 < var3; ++var4) {
        T t = var2[var4];
        this.next(t);
    }

    return this.complete();
}

关于挂起行为,这里的 repeat 似乎是导致问题的原因,因为它无限期地尝试一次又一次地重新连接源 Mono。

Mono.defer(monoSupplier).repeat().subscribe(System.out::println);

只需像这样更改即可了解行为。

Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);

备注:

  1. .repeat() 运算符在不带参数的情况下使用意味着无限期地重复。一旦原始订阅成功完成,它会立即启动新订阅。终止它的两种方法是:

    1. 限制迭代次数,例如作为 .repeat(10),根据 vins;

      建议
    2. 通过发送错误信号来中止序列,例如使用 Mono.error(Throwable) 或仅通过抛出异常。

  2. 调用 testPublisher::mono 声明 TestPublisher 的这个实例遵守 Mono 合同,即只发送一个值。

    因此两次调用 .emit() 或使用两个参数调用它都无济于事。多余的值将被忽略。

  3. TestPublisher 的 javadoc 说,

    TestPublisher are generally hot, [...] replaying the first termination signal to subsequent subscribers.

    表示迟到的订阅者会立即收到终止信号。 repeat() 操作员创建的订阅立即收到重播的终止信号,这反过来导致 repeat() 重新订阅,在一个紧密的循环中。

我认为一个可能的解决方案是使用 Mono.defer()Mono.fromSupplier().repeat() 创建的每个订阅创建一个新值。例如:

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;

public class So63117029 {

  private static class TestSubject {
    public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
      Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
    }
  }

  public static void main(String[] args) {
    LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));

    TestSubject testSubject = new TestSubject();
    testSubject.logMonoValues(() -> Mono.fromSupplier(data::remove));
  }
}

方法 Queue.remove() 在队列中没有更多数据时抛出 NoSuchElementException