从多个任务中创建一个可观察对象
Create one observable from several tasks
我想创建一个方法,它接受一个 S
和 return 一个 Observable<T>
,在三个(异步)任务完成后用一个值完成。
这三个任务都运行在队列上,在不同的线程中有自己的消费者。
这是我的想法:
- 任务 1 通过队列接收
<S, Subject<T>>
并从 S
计算值并在 Subject<T>
中设置 onNext
然后调用 onComplete()
.
- 任务 2 和 3 收到
<S, Subject<Void>>
并在完成工作后调用 onComplete()
。
任务 1、2 和 3 可以 运行 独立完成,并按随机顺序完成。
现在我有三个科目,一个 T
,两个 Void
。我想 return 第一个,但只让它在 所有 任务完成后发出值 T
。这是因为在所有任务完成之前,我不希望可观察对象的任何订阅者对 T
做任何事情。
组合 Subject 以实现此行为的正确方法是什么?我可以使用 CountDownLatch
等轻松地将其破解在一起,但希望有一种 rx-native 的方法来解决这个问题。
我计划通过队列使用主题作为回调是正确的方法吗?我曾经为此使用 CompletableFuture<T>
,但我想迁移到 RX。
我不完全确定主题在哪里发挥作用,但您可以使用 When
+ And
+ Then
同步三个任务
public IObservable<T> MyMethod<S, T>(S incoming) {
//Create a new plan
return Observable.When(
//Start with Task one which will return an T from an S
Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))
//Add in Task two which returns a System.Reactive.Unit
.And(Observable.FromAsync(() => /*Do Task 2*/))
//Same for Task 3
.And(Observable.FromAsync(() => /*Do Task 3*/))
//Only emit the item from the first Task.
.Then((ret, _, __) => ret))
//Finally we only want this to process once, then we will reuse the
//existing value for subsequent subscribers
.PublishLast().RefCount();
}
以上将等到所有三个项目都完成后才会发出。需要注意的一件事是,在 Rx 中,Void
对象实际上是一个 System.Reactive.Unit
,所以如果没有值,你应该返回它。
您可以编写自己的 Subject
来为您执行此操作,但由于多种原因通常不鼓励这样做。相反,您还可以连接任务生成的三个 Observables/Subjects。这个操作产生一个 Observable
,它发出这些任务产生的所有值,并且只有在所有输入 Observables 完成时才完成。
由于它们的类型略有不同,您需要使用 map()
.
更改后两个任务生成的 Observable 的签名
Observable<T> output = Observable.concat(t1,
t2.map(in -> null).ignoreElements(),
t3.map(in -> null).ignoreElements());
如果你想等待任何订阅者使用产生的值,直到所有 Observables 都完成,你可以在这个 Observable
.
上调用 last()
方法
Observable<T> output = Observable.concat(t1,
t2.map(in -> null).ignoreElements(),
t3.map(in -> null).ignoreElements()).last();
不要为此使用 Subject
s。而是使用异步调度程序合并可观察对象。所以你有:
T task1(S s);
void task2(S s);
void task3(S s);
然后
<S,T> Observable<T> get(S s) {
return Observable.merge(
Observable.just(s)
.map(x -> task1(x))
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task2(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task3(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()))
// wait for completion before emitting the single value
.last();
}
我想创建一个方法,它接受一个 S
和 return 一个 Observable<T>
,在三个(异步)任务完成后用一个值完成。
这三个任务都运行在队列上,在不同的线程中有自己的消费者。
这是我的想法:
- 任务 1 通过队列接收
<S, Subject<T>>
并从S
计算值并在Subject<T>
中设置onNext
然后调用onComplete()
. - 任务 2 和 3 收到
<S, Subject<Void>>
并在完成工作后调用onComplete()
。
任务 1、2 和 3 可以 运行 独立完成,并按随机顺序完成。
现在我有三个科目,一个 T
,两个 Void
。我想 return 第一个,但只让它在 所有 任务完成后发出值 T
。这是因为在所有任务完成之前,我不希望可观察对象的任何订阅者对 T
做任何事情。
组合 Subject 以实现此行为的正确方法是什么?我可以使用 CountDownLatch
等轻松地将其破解在一起,但希望有一种 rx-native 的方法来解决这个问题。
我计划通过队列使用主题作为回调是正确的方法吗?我曾经为此使用 CompletableFuture<T>
,但我想迁移到 RX。
我不完全确定主题在哪里发挥作用,但您可以使用 When
+ And
+ Then
public IObservable<T> MyMethod<S, T>(S incoming) {
//Create a new plan
return Observable.When(
//Start with Task one which will return an T from an S
Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))
//Add in Task two which returns a System.Reactive.Unit
.And(Observable.FromAsync(() => /*Do Task 2*/))
//Same for Task 3
.And(Observable.FromAsync(() => /*Do Task 3*/))
//Only emit the item from the first Task.
.Then((ret, _, __) => ret))
//Finally we only want this to process once, then we will reuse the
//existing value for subsequent subscribers
.PublishLast().RefCount();
}
以上将等到所有三个项目都完成后才会发出。需要注意的一件事是,在 Rx 中,Void
对象实际上是一个 System.Reactive.Unit
,所以如果没有值,你应该返回它。
您可以编写自己的 Subject
来为您执行此操作,但由于多种原因通常不鼓励这样做。相反,您还可以连接任务生成的三个 Observables/Subjects。这个操作产生一个 Observable
,它发出这些任务产生的所有值,并且只有在所有输入 Observables 完成时才完成。
由于它们的类型略有不同,您需要使用 map()
.
Observable<T> output = Observable.concat(t1,
t2.map(in -> null).ignoreElements(),
t3.map(in -> null).ignoreElements());
如果你想等待任何订阅者使用产生的值,直到所有 Observables 都完成,你可以在这个 Observable
.
last()
方法
Observable<T> output = Observable.concat(t1,
t2.map(in -> null).ignoreElements(),
t3.map(in -> null).ignoreElements()).last();
不要为此使用 Subject
s。而是使用异步调度程序合并可观察对象。所以你有:
T task1(S s);
void task2(S s);
void task3(S s);
然后
<S,T> Observable<T> get(S s) {
return Observable.merge(
Observable.just(s)
.map(x -> task1(x))
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task2(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task3(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()))
// wait for completion before emitting the single value
.last();
}