使用 repeat() 测试订阅
Testing subscription with repeat()
我想测试一个在延迟单声道上使用 repeat()
运算符并订阅结果的函数。在测试中,我使用 TestPublisher
来模拟来自单声道的新值。
在一个非常简化的形式中,它看起来像这样:
package de.cronos.mad.messages.backend;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
public class RepeatTest {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
@Test
public void repeatTest() {
TestPublisher<String> testPublisher = TestPublisher.create();
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(testPublisher::mono);
testPublisher.emit("Hello");
testPublisher.emit("World");
}
}
在将“Hello”记录到标准输出后执行挂起。我想我理解为什么会发生这种情况:emit(…)
调用从主线程发生并从那里“驱动”订阅。
我不知道如何修改此测试以使其完成,即不挂起?
TestPublisher emit
方法接受值数组。一旦发出,它就会关闭源。所以你不能一一发射。而是像这样传递所有值。
testPublisher.emit("Hello", "world");
// from TestPublisher emit
public final TestPublisher<T> emit(T... values) {
Objects.requireNonNull(values, "values array is null, please cast to T if null T required");
Object[] var2 = values;
int var3 = values.length;
for(int var4 = 0; var4 < var3; ++var4) {
T t = var2[var4];
this.next(t);
}
return this.complete();
}
关于挂起行为,这里的 repeat
似乎是导致问题的原因,因为它无限期地尝试一次又一次地重新连接源 Mono。
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
只需像这样更改即可了解行为。
Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);
备注:
.repeat()
运算符在不带参数的情况下使用意味着无限期地重复。一旦原始订阅成功完成,它会立即启动新订阅。终止它的两种方法是:
限制迭代次数,例如作为 .repeat(10)
,根据 vins;
建议
通过发送错误信号来中止序列,例如使用 Mono.error(Throwable)
或仅通过抛出异常。
调用 testPublisher::mono
声明 TestPublisher
的这个实例遵守 Mono
合同,即只发送一个值。
因此两次调用 .emit()
或使用两个参数调用它都无济于事。多余的值将被忽略。
TestPublisher 的 javadoc 说,
TestPublisher are generally hot, [...] replaying the first termination signal to subsequent subscribers.
表示迟到的订阅者会立即收到终止信号。 repeat()
操作员创建的订阅立即收到重播的终止信号,这反过来导致 repeat()
重新订阅,在一个紧密的循环中。
我认为一个可能的解决方案是使用 Mono.defer()
或 Mono.fromSupplier()
为 .repeat()
创建的每个订阅创建一个新值。例如:
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
public class So63117029 {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
public static void main(String[] args) {
LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(() -> Mono.fromSupplier(data::remove));
}
}
方法 Queue.remove()
在队列中没有更多数据时抛出 NoSuchElementException
。
我想测试一个在延迟单声道上使用 repeat()
运算符并订阅结果的函数。在测试中,我使用 TestPublisher
来模拟来自单声道的新值。
在一个非常简化的形式中,它看起来像这样:
package de.cronos.mad.messages.backend;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
public class RepeatTest {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
@Test
public void repeatTest() {
TestPublisher<String> testPublisher = TestPublisher.create();
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(testPublisher::mono);
testPublisher.emit("Hello");
testPublisher.emit("World");
}
}
在将“Hello”记录到标准输出后执行挂起。我想我理解为什么会发生这种情况:emit(…)
调用从主线程发生并从那里“驱动”订阅。
我不知道如何修改此测试以使其完成,即不挂起?
TestPublisher emit
方法接受值数组。一旦发出,它就会关闭源。所以你不能一一发射。而是像这样传递所有值。
testPublisher.emit("Hello", "world");
// from TestPublisher emit
public final TestPublisher<T> emit(T... values) {
Objects.requireNonNull(values, "values array is null, please cast to T if null T required");
Object[] var2 = values;
int var3 = values.length;
for(int var4 = 0; var4 < var3; ++var4) {
T t = var2[var4];
this.next(t);
}
return this.complete();
}
关于挂起行为,这里的 repeat
似乎是导致问题的原因,因为它无限期地尝试一次又一次地重新连接源 Mono。
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
只需像这样更改即可了解行为。
Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);
备注:
.repeat()
运算符在不带参数的情况下使用意味着无限期地重复。一旦原始订阅成功完成,它会立即启动新订阅。终止它的两种方法是:限制迭代次数,例如作为
建议.repeat(10)
,根据 vins;通过发送错误信号来中止序列,例如使用
Mono.error(Throwable)
或仅通过抛出异常。
调用
testPublisher::mono
声明TestPublisher
的这个实例遵守Mono
合同,即只发送一个值。因此两次调用
.emit()
或使用两个参数调用它都无济于事。多余的值将被忽略。TestPublisher 的 javadoc 说,
TestPublisher are generally hot, [...] replaying the first termination signal to subsequent subscribers.
表示迟到的订阅者会立即收到终止信号。
repeat()
操作员创建的订阅立即收到重播的终止信号,这反过来导致repeat()
重新订阅,在一个紧密的循环中。
我认为一个可能的解决方案是使用 Mono.defer()
或 Mono.fromSupplier()
为 .repeat()
创建的每个订阅创建一个新值。例如:
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
public class So63117029 {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
public static void main(String[] args) {
LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(() -> Mono.fromSupplier(data::remove));
}
}
方法 Queue.remove()
在队列中没有更多数据时抛出 NoSuchElementException
。