终止对外部事件的订阅
Terminate subscription on external event
在应用程序中,我对外部 HTTP 端点使用长轮询。我使用 Spring 的反应式 WebClient
来做到这一点。为了在应用程序停止时干净地关闭(并避免丑陋的 Netty 堆栈跟踪),我使用 takeUntil()
和一个 EmitterProcessor
的实例,我在 Spring 时调用 onNext()
停止我的 bean(我实现 SmartLifecycle
)。
整个过程是这样的:
@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
private boolean running = true;
private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
private final BackendMessageReceiver backendMessageReceiver;
public void waitForMessages() {
Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
.repeat()
.takeUntilOther(shutdown)
.subscribe(event -> {
// do something when the http endpoint answers
});
}
@Override
public int getPhase() {
// We need to cancel the subscriptions before Reactor/Netty shuts down.
// Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
return 0;
}
@Override
public void start() {
// Not needed
}
@Override
public void stop() {
log.info("Stopping message subscriptions");
shutdown.onNext(true);
shutdown.onComplete();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
目前整个机制运行良好。但是,EmitterProcessor
被标记为 @Deprecated
并且 javadoc 说使用 Sink
代替。 Sink
未实现 Publisher
接口,因此无法传递到 takeUntilOther()
。
我应该怎么做才能解决这个问题,而不会永远停留在 Project Reactor < 3.5 上?
Sinks
旨在作为面向开发人员的 API 以编程方式触发反应事件。如果没有办法将它们作为典型的 Flux
或 Mono
呈现给应用程序的其余部分,这将不会很有用。
Sinks.Many
对此有一个 asFlux()
的看法。同样,Sinks.One
和 Sinks.Empty
有一个 asMono()
视图。
这就是您可以用来传递给 takeUntilOther
的内容。
在应用程序中,我对外部 HTTP 端点使用长轮询。我使用 Spring 的反应式 WebClient
来做到这一点。为了在应用程序停止时干净地关闭(并避免丑陋的 Netty 堆栈跟踪),我使用 takeUntil()
和一个 EmitterProcessor
的实例,我在 Spring 时调用 onNext()
停止我的 bean(我实现 SmartLifecycle
)。
整个过程是这样的:
@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
private boolean running = true;
private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
private final BackendMessageReceiver backendMessageReceiver;
public void waitForMessages() {
Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
.repeat()
.takeUntilOther(shutdown)
.subscribe(event -> {
// do something when the http endpoint answers
});
}
@Override
public int getPhase() {
// We need to cancel the subscriptions before Reactor/Netty shuts down.
// Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
return 0;
}
@Override
public void start() {
// Not needed
}
@Override
public void stop() {
log.info("Stopping message subscriptions");
shutdown.onNext(true);
shutdown.onComplete();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
目前整个机制运行良好。但是,EmitterProcessor
被标记为 @Deprecated
并且 javadoc 说使用 Sink
代替。 Sink
未实现 Publisher
接口,因此无法传递到 takeUntilOther()
。
我应该怎么做才能解决这个问题,而不会永远停留在 Project Reactor < 3.5 上?
Sinks
旨在作为面向开发人员的 API 以编程方式触发反应事件。如果没有办法将它们作为典型的 Flux
或 Mono
呈现给应用程序的其余部分,这将不会很有用。
Sinks.Many
对此有一个 asFlux()
的看法。同样,Sinks.One
和 Sinks.Empty
有一个 asMono()
视图。
这就是您可以用来传递给 takeUntilOther
的内容。