在 Android 中使用 RxJava 排队任务
Queuing tasks with RxJava in Android
我正在为 Android 开发具有后台数据同步功能的应用程序。
我目前正在使用 RxJava 定期 post 服务器上的一些数据。
除此之外,我想为用户提供一个按钮 "force sync",它将立即触发同步。
我知道如何使用 Observable.interval()
定期推送数据,我会知道如何使用 Observalbe.just()
推送那个强制的数据,但如果发生这种情况,我想将它们排队前一个仍在运行时触发了一个。
所以让我们以1分钟为自动同步间隔为例,假设同步持续40秒(我在这里夸大了只是为了更容易理解)。现在,如果有任何机会,用户在自动仍然是 运行 时按下 "force" 按钮(反之亦然 - 当强制按钮仍然是 运行 时自动触发),我想要将第二个同步请求排队等待第一个同步请求完成。
我画了这幅图,可能会给它更多的视角:
如您所见,自动触发(由某些 Observable.interval()
),并且在同步过程中,用户按下 "force" 按钮。现在我们要等待第一个请求完成,然后再次开始强制请求。
有一次,虽然强制请求是 运行,但新的自动请求再次被触发,只是将其添加到队列中。在队列中的最后一个完成后,一切都停止了,然后稍后又安排了自动。
希望有人能指出我正确的操作员如何做到这一点。我试过 Observable.combineLatest()
,但是队列列表在开始时被调度,当我将新的同步添加到队列时,它在上一个操作完成时没有继续。
非常感谢任何帮助,
达科
您可以通过将计时器与单击按钮 Observable
/Subject
合并来实现此目的,使用 onBackpressureBuffer
的排队效果并将处理连接到其中以确保运行一次一个。
PublishSubject<Long> subject = PublishSubject.create();
Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);
periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
@Override
public Observable<Integer> call(Long v) {
// simulates the task to run
return Observable.just(1)
.delay(300, TimeUnit.MILLISECONDS);
}
}
).subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);
Thread.sleep(800);
虽然有一个公认的答案和一个很好的解决方案,但我想分享另一个使用 Scheduler 和 SingleThreadExecutor 来做到这一点的选项
public static void main(String[] args) throws Exception {
System.out.println(" init ");
Observable<Long> numberObservable =
Observable.interval(700, TimeUnit.MILLISECONDS).take(10);
final Subject subject = PublishSubject.create();
Executor executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor);
numberObservable.observeOn(scheduler).subscribe(subject);
subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
onCompleteFunc("subscriber 1"));
Thread.sleep(800);
//simulate action
executor.execute(new Runnable() {
@Override
public void run() {
subject.onNext(333l);
}
});
Thread.sleep(5000);
}
static Action1<Long> onNextFunc(final String who) {
return new Action1<Long>() {
public void call(Long x) {
System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
+ " -- " + System.currentTimeMillis());
try {
//simulate some work
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
static Action1<Throwable> onErrorFunc(final String who) {
return new Action1<Throwable>() {
public void call(Throwable t) {
t.printStackTrace();
}
};
}
static Action0 onCompleteFunc(final String who) {
return new Action0() {
public void call() {
System.out.println(who + " complete");
}
};
}
我正在为 Android 开发具有后台数据同步功能的应用程序。
我目前正在使用 RxJava 定期 post 服务器上的一些数据。
除此之外,我想为用户提供一个按钮 "force sync",它将立即触发同步。
我知道如何使用 Observable.interval()
定期推送数据,我会知道如何使用 Observalbe.just()
推送那个强制的数据,但如果发生这种情况,我想将它们排队前一个仍在运行时触发了一个。
所以让我们以1分钟为自动同步间隔为例,假设同步持续40秒(我在这里夸大了只是为了更容易理解)。现在,如果有任何机会,用户在自动仍然是 运行 时按下 "force" 按钮(反之亦然 - 当强制按钮仍然是 运行 时自动触发),我想要将第二个同步请求排队等待第一个同步请求完成。
我画了这幅图,可能会给它更多的视角:
如您所见,自动触发(由某些 Observable.interval()
),并且在同步过程中,用户按下 "force" 按钮。现在我们要等待第一个请求完成,然后再次开始强制请求。
有一次,虽然强制请求是 运行,但新的自动请求再次被触发,只是将其添加到队列中。在队列中的最后一个完成后,一切都停止了,然后稍后又安排了自动。
希望有人能指出我正确的操作员如何做到这一点。我试过 Observable.combineLatest()
,但是队列列表在开始时被调度,当我将新的同步添加到队列时,它在上一个操作完成时没有继续。
非常感谢任何帮助, 达科
您可以通过将计时器与单击按钮 Observable
/Subject
合并来实现此目的,使用 onBackpressureBuffer
的排队效果并将处理连接到其中以确保运行一次一个。
PublishSubject<Long> subject = PublishSubject.create();
Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);
periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
@Override
public Observable<Integer> call(Long v) {
// simulates the task to run
return Observable.just(1)
.delay(300, TimeUnit.MILLISECONDS);
}
}
).subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);
Thread.sleep(800);
虽然有一个公认的答案和一个很好的解决方案,但我想分享另一个使用 Scheduler 和 SingleThreadExecutor 来做到这一点的选项
public static void main(String[] args) throws Exception {
System.out.println(" init ");
Observable<Long> numberObservable =
Observable.interval(700, TimeUnit.MILLISECONDS).take(10);
final Subject subject = PublishSubject.create();
Executor executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor);
numberObservable.observeOn(scheduler).subscribe(subject);
subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
onCompleteFunc("subscriber 1"));
Thread.sleep(800);
//simulate action
executor.execute(new Runnable() {
@Override
public void run() {
subject.onNext(333l);
}
});
Thread.sleep(5000);
}
static Action1<Long> onNextFunc(final String who) {
return new Action1<Long>() {
public void call(Long x) {
System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
+ " -- " + System.currentTimeMillis());
try {
//simulate some work
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
static Action1<Throwable> onErrorFunc(final String who) {
return new Action1<Throwable>() {
public void call(Throwable t) {
t.printStackTrace();
}
};
}
static Action0 onCompleteFunc(final String who) {
return new Action0() {
public void call() {
System.out.println(who + " complete");
}
};
}