从 Observable 使用 startWith 创建 Observable

Creating an Observable using startWith from an Observable

我正在用另一个 Observable 的输入过滤一个 Observable - 过滤的输入来自用户。

过滤是使用 RxJS 运算符完成的 combineLatest。使用这意味着当订阅这个流时,在两个源 Observables 都发出发射之前不会发出任何值 - 我希望创建的流在创建时发出(没有任何过滤),然后再进行任何用户输入。

我想我应该使用 startWith 运算符,以便流在创建时产生发射,但我不知道如何从 Observable 中播种它。使用 Observable,因为数据来自 Firebase 并使用 FirebaseListObservable 处理。

下面是我目前正在做的事情的拼凑版本。

let tagInput = document.getElementById('tags');
let tagExclusionStream = Observable
  .fromEvent(tagInput, 'input')
  .map((e: any) => createsArrayFromInput(e.target.value));

let allTags: Observable<any[]> = getAllTags();

let filteredTags = allTags
  .combineLatest(tagExclusionStream, (tags, tagExclusions) => {
     return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
  });

// I want this to print out without needing the tagExclusionStream to emit first 
filteredTags.subscribe(tags => console.log("Tags:", tags))

请告诉我我的方法是否完全off/there是更好的方法,因为我是 RxJS 的新手。

我认为这可以解决问题:

let filteredTags = allTags
  .combineLatest(tagExclusionStream.startWith(''), (tags, tagExclusions) => {
     return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
  });

或者,如果您在不同的地方使用 tagExclusionStream,您可以这样做:

let tagExclusionStream = Observable
  .fromEvent(tagInput, 'input')
  .map((e: any) => createsArrayFromInput(e.target.value))
  .startWith('');

startWith 只是在使用 concat internally。 也一样

const startValue$ = of('start with this')

concat(startValue$, source$)

或者自己编写运算符

function startFrom<T, O extends ObservableInput<any>>(start: O): OperatorFunction<T, T | ObservedValueOf<O>> {
  return (source: Observable<T>) => concat(start, source)
}

source$.pipe(
  startFrom(startValue$)
)

确保startValue$完成

一旦 startValue$ 完成,新的 Observable 将切换到 source$。 因此,如果开始的 Observable 是一个长期存在的 Observable,例如ReplaySubject 你只关心最近的值,你必须确保你传入的 Observable 完成,例如take(1).

const { ReplaySubject, concat, of } = rxjs;
const { take } = rxjs.operators;

const source$ = of('source-1', 'source-2')
const replay = new ReplaySubject(1)
replay.next('subject-1')
replay.next('subject-2')

const startWith$ = replay.pipe(take(1)) // <-- take one item and complete
const o$ = concat(startWith$, source$)
o$.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>