从多个任务中创建一个可观察对象

Create one observable from several tasks

我想创建一个方法,它接受一个 S 和 return 一个 Observable<T>,在三个(异步)任务完成后用一个值完成。

这三个任务都运行在队列上,在不同的线程中有自己的消费者。

这是我的想法:

任务 1、2 和 3 可以 运行 独立完成,并按随机顺序完成。

现在我有三个科目,一个 T,两个 Void。我想 return 第一个,但只让它在 所有 任务完成后发出值 T。这是因为在所有任务完成之前,我不希望可观察对象的任何订阅者对 T 做任何事情。

组合 Subject 以实现此行为的正确方法是什么?我可以使用 CountDownLatch 等轻松地将其破解在一起,但希望有一种 rx-native 的方法来解决这个问题。

我计划通过队列使用主题作为回调是正确的方法吗?我曾经为此使用 CompletableFuture<T>,但我想迁移到 RX。

我不完全确定主题在哪里发挥作用,但您可以使用 When + And + Then

同步三个任务
public IObservable<T> MyMethod<S, T>(S incoming) {

  //Create a new plan
  return Observable.When(

   //Start with Task one which will return an T from an S
   Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))

   //Add in Task two which returns a System.Reactive.Unit
   .And(Observable.FromAsync(() => /*Do Task 2*/))

   //Same for Task 3
   .And(Observable.FromAsync(() => /*Do Task 3*/))

   //Only emit the item from the first Task.
   .Then((ret, _, __) => ret))

   //Finally we only want this to process once, then we will reuse the
   //existing value for subsequent subscribers
   .PublishLast().RefCount();
}

以上将等到所有三个项目都完成后才会发出。需要注意的一件事是,在 Rx 中,Void 对象实际上是一个 System.Reactive.Unit,所以如果没有值,你应该返回它。

您可以编写自己的 Subject 来为您执行此操作,但由于多种原因通常不鼓励这样做。相反,您还可以连接任务生成的三个 Observables/Subjects。这个操作产生一个 Observable ,它发出这些任务产生的所有值,并且只有在所有输入 Observables 完成时才完成。

由于它们的类型略有不同,您需要使用 map().

更改后两个任务生成的 Observable 的签名
Observable<T> output = Observable.concat(t1, 
    t2.map(in -> null).ignoreElements(), 
    t3.map(in -> null).ignoreElements());

如果你想等待任何订阅者使用产生的值,直到所有 Observables 都完成,你可以在这个 Observable.

上调用 last() 方法
Observable<T> output = Observable.concat(t1, 
    t2.map(in -> null).ignoreElements(), 
    t3.map(in -> null).ignoreElements()).last();

不要为此使用 Subjects。而是使用异步调度程序合并可观察对象。所以你有:

T task1(S s);
void task2(S s);
void task3(S s);

然后

<S,T> Observable<T> get(S s) {
    return Observable.merge(
        Observable.just(s)
            .map(x -> task1(x))
            .subscribeOn(Schedulers.computation()),
        (Observable<T>) Observable.just(s)
            .doOnNext(x -> task2(x))
            .ignoreElements()
            .cast(Object.class)
            .subscribeOn(Schedulers.computation()),
        (Observable<T>) Observable.just(s)
            .doOnNext(x -> task3(x))
            .ignoreElements()
            .cast(Object.class)
            .subscribeOn(Schedulers.computation()))
        // wait for completion before emitting the single value
        .last();
}