发射后退订间隔

Unsubscribe interval after emission

我有一个代码,我在其中设置一个间隔,直到条件完成,然后在订阅中发回结果。

但由于是订阅继续的时间间隔。

我想知道是否有任何方法可以在发出某些内容后取消订阅 Observable 间隔

这里是代码

Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
                .map(i -> eventHandler.getProcessedEvents())
                .filter(eventsProcessed -> eventsProcessed >= 10)
                .doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
                .doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
                .subscribe(t -> resumeRequest(asyncResponse));
        new TestSubscriber((Observer) subscriber).awaitTerminalEvent(10, TimeUnit.SECONDS);
subscriber.unsubscribe();

现在我使用计时器然后取消订阅,但很糟糕!

此致

您可以使用 first 运算符

Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
                .map(i -> eventHandler.getProcessedEvents())
                .first(eventsProcessed -> eventsProcessed >= 10)
                .doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
                .doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
                .subscribe(t -> resumeRequest(asyncResponse));

而不是过滤器。这可确保您仅在满足条件时获得一次发射。请注意,如果您的条件间隔 Observable 在没有满足您的条件的情况下终止,您将得到一个异常。