在 Android 中使用 RxJava 排队任务

Queuing tasks with RxJava in Android

我正在为 Android 开发具有后台数据同步功能的应用程序。 我目前正在使用 RxJava 定期 post 服务器上的一些数据。 除此之外,我想为用户提供一个按钮 "force sync",它将立即触发同步。 我知道如何使用 Observable.interval() 定期推送数据,我会知道如何使用 Observalbe.just() 推送那个强制的数据,但如果发生这种情况,我想将它们排队前一个仍在运行时触发了一个。

所以让我们以1分钟为自动同步间隔为例,假设同步持续40秒(我在这里夸大了只是为了更容易理解)。现在,如果有任何机会,用户在自动仍然是 运行 时按下 "force" 按钮(反之亦然 - 当强制按钮仍然是 运行 时自动触发),我想要将第二个同步请求排队等待第一个同步请求完成。

我画了这幅图,可能会给它更多的视角:

如您所见,自动触发(由某些 Observable.interval()),并且在同步过程中,用户按下 "force" 按钮。现在我们要等待第一个请求完成,然后再次开始强制请求。 有一次,虽然强制请求是 运行,但新的自动请求再次被触发,只是将其添加到队列中。在队列中的最后一个完成后,一切都停止了,然后稍后又安排了自动。

希望有人能指出我正确的操作员如何做到这一点。我试过 Observable.combineLatest(),但是队列列表在开始时被调度,当我将新的同步添加到队列时,它在上一个操作完成时没有继续。

非常感谢任何帮助, 达科

您可以通过将计时器与单击按钮 Observable/Subject 合并来实现此目的,使用 onBackpressureBuffer 的排队效果并将处理连接到其中以确保运行一次一个。

PublishSubject<Long> subject = PublishSubject.create();

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);

periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Long v) {
        // simulates the task to run
        return Observable.just(1)
                .delay(300, TimeUnit.MILLISECONDS);
    }
}
).subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);

Thread.sleep(800);

虽然有一个公认的答案和一个很好的解决方案,但我想分享另一个使用 Scheduler 和 SingleThreadExecutor 来做到这一点的选项

public static void main(String[] args) throws Exception {
    System.out.println(" init ");
    Observable<Long> numberObservable =
            Observable.interval(700, TimeUnit.MILLISECONDS).take(10);

    final Subject subject = PublishSubject.create();

    Executor executor = Executors.newSingleThreadExecutor();
    Scheduler scheduler = Schedulers.from(executor);
    numberObservable.observeOn(scheduler).subscribe(subject);

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
                    onCompleteFunc("subscriber 1"));

    Thread.sleep(800);
    //simulate action
    executor.execute(new Runnable() {
        @Override
        public void run() {
            subject.onNext(333l);
        }
    });

    Thread.sleep(5000);
}

static Action1<Long> onNextFunc(final String who) {
    return new Action1<Long>() {
        public void call(Long x) {
            System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
                    + " -- " + System.currentTimeMillis());
            try {
                //simulate some work
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
}

static Action1<Throwable> onErrorFunc(final String who) {
    return new Action1<Throwable>() {
        public void call(Throwable t) {
            t.printStackTrace();
        }
    };
}

static Action0 onCompleteFunc(final String who) {
    return new Action0() {
        public void call() {
            System.out.println(who + " complete");
        }
    };
}