RxJava / RxJs:如何合并两个源可观察对象但在其中一个完成后立即完成

RxJava / RxJs: How to merge two source observables but complete as soon as one of them completes

我有两个源可观察量。 我想合并两个源 observable,但是合并的 observable 应该在其中一个源 observable 完成后立即完成。

期望的行为:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x

如果其中一个来源出现错误,错误应该传播到合并的可观察对象:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex

"merge" 运算符仅在两个源都已完成时才完成合并流:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x

我怎样才能达到我想要的行为?

我当然希望其他人用更优雅的方法来回答,但这很有效。

我认为您必须使用 take 运算符之一。您可以在一个源完成时完成所有源,如下所示:

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
Rx.Observable.merge(a.takeUntil(b.last()), b.takeUntil(a.last()))
  .subscribe(
    x => { console.log('next', x); },
    null,
    () => { console.log('complete'); }
  );
  
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

或者可读性较差但可扩展性更强的版本:

function merge(...obs) {
  return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);

merge(a, b)
  .subscribe(
    x => { console.log('next', x); },
    null,
    () => { console.log('complete'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

这是错误传播的示例:

function merge(...obs) {
  return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
const c = Rx.Observable.timer(2200).map(x => { throw 'oops!'; });

merge(a, b, c)
  .subscribe(
    x => { console.log('next', x); },
    x => { console.log('error', x); },
    () => { console.log('complete'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

在合并的外部使用 takeUntil 很棘手,因为您会丢失最后发出的值。

您需要处理元数据,即有关每个可观察对象的信息。为此,请在每个流上使用 materialize() 运算符,并在合并流上使用 dematerialize() 以实际发出数据。

Observable.merge( observableA.materialize(),
                  observableB.materialize() )
  .takeWhile( notification -> notification.hasValue() )
  .dematerialize()
  .subscribe( ... );

这将合并两个可观察对象,直到其中一个完成或发出错误。

当一个 observable 完成时,它不会发出值,但我们可以 concat 它与另一个发出单个值的 'signal' observable。然后我们可以使用 takeWhile 运算符观察 'signal' observable 的值。

当然你必须确保 'signal' observable 的发射值不是被合并的 observable 发射的值 - 如果 takeWhile 谓词通过引用进行比较。

这是一个例子:

const obs1$ = Rx.Observable.interval(1000)
    .map(x => `obs1: ${x}`)
    .take(5);

const obs2$ = Rx.Observable.interval(300)
    .map(x => `obs2: ${x}`)
    .take(9);

const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);

Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
    .takeWhile(x => x !== signalFinishMessage)
    .subscribe(
        x => console.log(x),
        err => console.log('received error:', err),
        () => console.log('complete')
    );
    
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

错误也会传播:

const obs1$ = Rx.Observable.interval(1000)
    .map(x => `obs1: ${x}`)
    .take(5);

const obs2$ = Rx.Observable.interval(300)
    .map(x => `obs2: ${x}`)
    .take(9)
    .concat(Rx.Observable.throw(`the world's about to end`));

const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);

Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
    .takeWhile(x => x !== signalFinishMessage)
    .subscribe(
        x => console.log(x),
        err => console.log('received error:', err),
        () => console.log('complete')
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

我最后自己动手了:

import { Observable } from 'rxjs';

export function whileAll<T>(...observables: Observable<T>[]): Observable<T> {
  return new Observable<T>(function (observer) {
    if (observables.length === 0)
      observer.complete();
    else {
      const next = observer.next.bind(observer);
      const error = observer.error.bind(observer);
      const complete = observer.complete.bind(observer);
      for (let i = 0; i < observables.length; i++)
        observer.add(observables[i].subscribe(next, error, complete));
    }
  });
}