在 Observable Items RxJava 之间添加延迟

Adding delay between Observable Items RxJava

我有一个从对象列表创建的可观察对象。对于列表中的每个对象,我都会发出一个网络请求,但我想在列表中的每个项目之间放置一个延迟,以便 space 稍微超出请求。这是我的代码片段。

return Observable.from(documentGroupModels).flatMap(new Func1<DocumentGroupModel, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(DocumentGroupModel documentGroupModel) {
            return refreshDocumentWithUri(documentGroupModel.getUri(), documentGroupModel.sectionGroupId,
                                          includeExceptions, false);
        }
    });

据我所知,在这种情况下使用延迟或缓冲区不太适用。

如果您的延迟是静态的,您可以结合使用 Zipinterval 运算符,这样您就可以发出一个 zip 项目每次配置你的间隔。

查看示例

       @Test
public void delaySteps() {
    long start = System.currentTimeMillis();
    Subscription subscription =
            Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS),
                           (i, t) -> i)
                    .subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start)));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS);
}

你也可以用你的列表创建一个 Observable 并使用 concatMap,然后你可以对每个发出的项目使用 delay。也许这个解决方案更优雅而不是 Hacky

      @Test
public void delayObservableList() {
    Observable.from(Arrays.asList(1, 2, 3, 4, 5))
            .concatMap(s -> Observable.just(s).delay(100, TimeUnit.MILLISECONDS))
            .subscribe(n -> System.out.println(n + " emitted"),
                       e -> {
                       },
                       () -> System.out.println("All emitted"));
    new TestSubscriber().awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);

}

你可以在这里看到另一个延迟的例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/utils/ObservableDelay.java

使用“延迟”运算符,例如

return Observable.from(documentGroupModels).flatMap(new Func1<DocumentGroupModel, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(DocumentGroupModel documentGroupModel) {
            return refreshDocumentWithUri(documentGroupModel.getUri(), documentGroupModel.sectionGroupId,
                                          includeExceptions, false).delay(2000, TimeUnit.MILLISECONDS);
        }
    });