Java reactor 如何正确启动 async cancellable sideeffect
Java reactor how to properly start async cancellable sideeffect
我正在尝试使用 reactor 编写一些我知道如何使用可完成的 futures 编写的东西。我在其中收到“在非阻塞范围内调用订阅”警告。
我的目标是在超时后调用 turnOn()
,超时后应调用 turnOff()
。如果再次调用 turnOn(),它应该取消旧的超时并等待新的超时。
我应该怎么做?我可以做一个 hibrate 并使用 CompletableFuture 来超时,但是 reactor 的 api 更容易一些。
此测试按预期工作:
public class TimeoutTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
CompletableFuture<Void> turnOffFuture = null;
@DisplayName("Should timeout on turnOn with timeout")
@Test
public void timeoutCompletableFuture() throws InterruptedException {
turnOn(Duration.ofMillis(100)).join();
verify(service).turnOn();
verify(service,never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service{
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffFuture != null)
turnOffFuture.cancel(false);
turnOffFuture = null;
}
public CompletableFuture<Void> turnOn(Duration timeout) {
CompletableFuture<Void> turnOnFuture = turnOn();
cancelTimeout();
turnOffFuture = turnOnFuture.thenRun(() -> delay(timeout))
.thenRun(this::turnOff);
return turnOnFuture;
}
private void delay(Duration duration) {
try {
Thread.sleep(BigDecimal.valueOf(duration.getSeconds())
.scaleByPowerOfTen(3)
.add(BigDecimal.valueOf(duration.getNano(), 6))
.intValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private CompletableFuture<Void> turnOn() {
return CompletableFuture.runAsync(() -> service.turnOn());
}
private CompletableFuture<Void> turnOff() {
return CompletableFuture.runAsync(() -> service.turnOff());
}
}
但我的反应堆代码没有。
public class TimeoutMonoTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
Disposable turnOffDisposable = null;
@DisplayName("Should timeout on turnOn with timeout")
@Test
public void timeoutMono() throws InterruptedException {
turnOn(Duration.ofMillis(100)).block(Duration.ofMillis(10));
verify(service).turnOn();
verify(service, never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service {
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffDisposable != null)
turnOffDisposable.dispose();
turnOffDisposable = null;
}
public Mono<Void> turnOn(Duration timeout) {
Mono<Void> turnOnFuture = turnOn();
cancelTimeout();
turnOffDisposable = turnOnFuture.delayElement(timeout)
.subscribe(it -> this.turnOff());
return turnOnFuture;
}
private Mono<Void> turnOn() {
service.turnOn();
return Mono.just("not empty but mapped to void").then();
}
private Mono<Void> turnOff() {
service.turnOff();
return Mono.just("not empty but mapped to void").then();
}
}
问题在于 turnOn()
和 turnOff()
方法中到 void mono 的映射。他们实际上并没有得到“下一个”信号,只是一个“成功”信号。
修复只是将 turnOn 方法更改为:
public Mono<Void> turnOn(Duration timeout) {
cancelTimeout();
Mono<Void> turnOnMono = turnOn();
turnOffDisposable = turnOnMono.delayElement(timeout)
.then(turnOff())
.subscribe();
return turnOn();
}
我正在尝试使用 reactor 编写一些我知道如何使用可完成的 futures 编写的东西。我在其中收到“在非阻塞范围内调用订阅”警告。
我的目标是在超时后调用 turnOn()
,超时后应调用 turnOff()
。如果再次调用 turnOn(),它应该取消旧的超时并等待新的超时。
我应该怎么做?我可以做一个 hibrate 并使用 CompletableFuture 来超时,但是 reactor 的 api 更容易一些。
此测试按预期工作:
public class TimeoutTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
CompletableFuture<Void> turnOffFuture = null;
@DisplayName("Should timeout on turnOn with timeout")
@Test
public void timeoutCompletableFuture() throws InterruptedException {
turnOn(Duration.ofMillis(100)).join();
verify(service).turnOn();
verify(service,never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service{
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffFuture != null)
turnOffFuture.cancel(false);
turnOffFuture = null;
}
public CompletableFuture<Void> turnOn(Duration timeout) {
CompletableFuture<Void> turnOnFuture = turnOn();
cancelTimeout();
turnOffFuture = turnOnFuture.thenRun(() -> delay(timeout))
.thenRun(this::turnOff);
return turnOnFuture;
}
private void delay(Duration duration) {
try {
Thread.sleep(BigDecimal.valueOf(duration.getSeconds())
.scaleByPowerOfTen(3)
.add(BigDecimal.valueOf(duration.getNano(), 6))
.intValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private CompletableFuture<Void> turnOn() {
return CompletableFuture.runAsync(() -> service.turnOn());
}
private CompletableFuture<Void> turnOff() {
return CompletableFuture.runAsync(() -> service.turnOff());
}
}
但我的反应堆代码没有。
public class TimeoutMonoTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
Disposable turnOffDisposable = null;
@DisplayName("Should timeout on turnOn with timeout")
@Test
public void timeoutMono() throws InterruptedException {
turnOn(Duration.ofMillis(100)).block(Duration.ofMillis(10));
verify(service).turnOn();
verify(service, never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service {
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffDisposable != null)
turnOffDisposable.dispose();
turnOffDisposable = null;
}
public Mono<Void> turnOn(Duration timeout) {
Mono<Void> turnOnFuture = turnOn();
cancelTimeout();
turnOffDisposable = turnOnFuture.delayElement(timeout)
.subscribe(it -> this.turnOff());
return turnOnFuture;
}
private Mono<Void> turnOn() {
service.turnOn();
return Mono.just("not empty but mapped to void").then();
}
private Mono<Void> turnOff() {
service.turnOff();
return Mono.just("not empty but mapped to void").then();
}
}
问题在于 turnOn()
和 turnOff()
方法中到 void mono 的映射。他们实际上并没有得到“下一个”信号,只是一个“成功”信号。
修复只是将 turnOn 方法更改为:
public Mono<Void> turnOn(Duration timeout) {
cancelTimeout();
Mono<Void> turnOnMono = turnOn();
turnOffDisposable = turnOnMono.delayElement(timeout)
.then(turnOff())
.subscribe();
return turnOn();
}