发射后退订间隔
Unsubscribe interval after emission
我有一个代码,我在其中设置一个间隔,直到条件完成,然后在订阅中发回结果。
但由于是订阅继续的时间间隔。
我想知道是否有任何方法可以在发出某些内容后取消订阅 Observable 间隔
这里是代码
Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
.map(i -> eventHandler.getProcessedEvents())
.filter(eventsProcessed -> eventsProcessed >= 10)
.doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
.doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
.subscribe(t -> resumeRequest(asyncResponse));
new TestSubscriber((Observer) subscriber).awaitTerminalEvent(10, TimeUnit.SECONDS);
subscriber.unsubscribe();
现在我使用计时器然后取消订阅,但很糟糕!
此致
您可以使用 first
运算符
Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
.map(i -> eventHandler.getProcessedEvents())
.first(eventsProcessed -> eventsProcessed >= 10)
.doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
.doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
.subscribe(t -> resumeRequest(asyncResponse));
而不是过滤器。这可确保您仅在满足条件时获得一次发射。请注意,如果您的条件间隔 Observable 在没有满足您的条件的情况下终止,您将得到一个异常。
我有一个代码,我在其中设置一个间隔,直到条件完成,然后在订阅中发回结果。
但由于是订阅继续的时间间隔。
我想知道是否有任何方法可以在发出某些内容后取消订阅 Observable 间隔
这里是代码
Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
.map(i -> eventHandler.getProcessedEvents())
.filter(eventsProcessed -> eventsProcessed >= 10)
.doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
.doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
.subscribe(t -> resumeRequest(asyncResponse));
new TestSubscriber((Observer) subscriber).awaitTerminalEvent(10, TimeUnit.SECONDS);
subscriber.unsubscribe();
现在我使用计时器然后取消订阅,但很糟糕!
此致
您可以使用 first
运算符
Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
.map(i -> eventHandler.getProcessedEvents())
.first(eventsProcessed -> eventsProcessed >= 10)
.doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
.doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
.subscribe(t -> resumeRequest(asyncResponse));
而不是过滤器。这可确保您仅在满足条件时获得一次发射。请注意,如果您的条件间隔 Observable 在没有满足您的条件的情况下终止,您将得到一个异常。