RxJava 并行计算的排序输出
RxJava sorted output from parallell computation
我有一个要并行执行的任务列表,但我想以与原始列表相同的顺序显示任务的结果。
换句话说,如果我有任务列表 [A,B,C],我不想在显示 A-result 之前显示 B-result,但我也不想等到 A-task 完成后再开始 B -任务。
另外,我想尽快显示每个结果,换句话说,如果任务完成顺序是B,然后是A,然后是C,我不想在收到B结果时显示任何内容,然后在我收到 A-result 时立即显示 A-result,然后显示 B-result,然后在我收到它时显示 C-result。
通过为每个任务创建一个 Observable,将它们与合并合并,并在计算线程池上订阅,然后编写一个 Subscriber 来为接收到的任何乱序结果保存一个缓冲区,这当然不是一件非常棘手的事情.然而,Rx 的经验法则往往是 "there's already an operator for that",所以问题是 "what is the proper RxJava way to solve this?" 是否确实存在这样的事情。
"There's not quite an operator for that"。虽然,在 1.0.15-SNAPSHOT 版本中有一个实验性的 concatEagar()
运算符,但听起来它可以满足您的需求。 Pull request for concatEager
repositories {
maven { url 'https://oss.jfrog.org/libs-snapshot' }
}
dependencies {
compile 'io.reactivex:rxjava:1.0.15-SNAPSHOT'
}
如果您想推出自己的临时解决方案,直到 concatEager()
获得批准。你可以尝试这样的事情:
public Observable<Result> concatEager(final Observable<Result> taskA, final Observable<Result> taskB, final Observable<Result> taskC) {
return Observable
.create(subscriber -> {
final Observable<Result> taskACached = taskA.cache();
final Observable<Result> taskBCached = taskB.cache();
final Observable<Result> taskCCached = taskC.cache();
// Kick off all the tasks simultaneously.
subscriber.add(
Observable
.merge(taskACached, taskBCached, taskCCached)
.subscribe(
result -> { // Throw away result
},
throwable -> { // Ignore errors
}
)
);
// Put the results in order.
subscriber.add(
Observable
.concat(taskACached, taskBCached, taskCCached)
.subscribe(subscriber)
);
});
}
请注意,以上代码完全未经测试。可能有更好的方法来做到这一点,但这是我首先想到的...
看来您需要 concatEager
来完成此任务,但使用 1.0.15 之前的工具可以实现它,而不需要 "creating" Observables。这是一个例子:
Observable<Long> source1 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(10);
Observable<Long> source2 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(20);
Observable<Long> source3 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(15);
Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);
sources.map(v -> {
Observable<Long> c = v.cache();
c.subscribe(); // to cache all
return c;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println);
缺点是它会在整个序列持续时间内保留所有值。这可以通过一种特殊的 Subject 来解决:UnicastSubject
但是 RxJava 1.x 没有并且可能没有 "officially"。但是,您可以查看 my blog posts 之一并自己构建 if 并具有以下代码:
//...
sources.map(v -> {
UnicastSubject<Long> subject = UnicastSubject.create();
v.subscribe(subject);
return subject;
})
//...
我有一个要并行执行的任务列表,但我想以与原始列表相同的顺序显示任务的结果。 换句话说,如果我有任务列表 [A,B,C],我不想在显示 A-result 之前显示 B-result,但我也不想等到 A-task 完成后再开始 B -任务。
另外,我想尽快显示每个结果,换句话说,如果任务完成顺序是B,然后是A,然后是C,我不想在收到B结果时显示任何内容,然后在我收到 A-result 时立即显示 A-result,然后显示 B-result,然后在我收到它时显示 C-result。
通过为每个任务创建一个 Observable,将它们与合并合并,并在计算线程池上订阅,然后编写一个 Subscriber 来为接收到的任何乱序结果保存一个缓冲区,这当然不是一件非常棘手的事情.然而,Rx 的经验法则往往是 "there's already an operator for that",所以问题是 "what is the proper RxJava way to solve this?" 是否确实存在这样的事情。
"There's not quite an operator for that"。虽然,在 1.0.15-SNAPSHOT 版本中有一个实验性的 concatEagar()
运算符,但听起来它可以满足您的需求。 Pull request for concatEager
repositories {
maven { url 'https://oss.jfrog.org/libs-snapshot' }
}
dependencies {
compile 'io.reactivex:rxjava:1.0.15-SNAPSHOT'
}
如果您想推出自己的临时解决方案,直到 concatEager()
获得批准。你可以尝试这样的事情:
public Observable<Result> concatEager(final Observable<Result> taskA, final Observable<Result> taskB, final Observable<Result> taskC) {
return Observable
.create(subscriber -> {
final Observable<Result> taskACached = taskA.cache();
final Observable<Result> taskBCached = taskB.cache();
final Observable<Result> taskCCached = taskC.cache();
// Kick off all the tasks simultaneously.
subscriber.add(
Observable
.merge(taskACached, taskBCached, taskCCached)
.subscribe(
result -> { // Throw away result
},
throwable -> { // Ignore errors
}
)
);
// Put the results in order.
subscriber.add(
Observable
.concat(taskACached, taskBCached, taskCCached)
.subscribe(subscriber)
);
});
}
请注意,以上代码完全未经测试。可能有更好的方法来做到这一点,但这是我首先想到的...
看来您需要 concatEager
来完成此任务,但使用 1.0.15 之前的工具可以实现它,而不需要 "creating" Observables。这是一个例子:
Observable<Long> source1 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(10);
Observable<Long> source2 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(20);
Observable<Long> source3 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(15);
Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);
sources.map(v -> {
Observable<Long> c = v.cache();
c.subscribe(); // to cache all
return c;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println);
缺点是它会在整个序列持续时间内保留所有值。这可以通过一种特殊的 Subject 来解决:UnicastSubject
但是 RxJava 1.x 没有并且可能没有 "officially"。但是,您可以查看 my blog posts 之一并自己构建 if 并具有以下代码:
//...
sources.map(v -> {
UnicastSubject<Long> subject = UnicastSubject.create();
v.subscribe(subject);
return subject;
})
//...