rxjs - 使用布尔可观察流值过滤流事件

rxjs - Filter stream events with boolean observable stream value

this.mainObservable$.pipe(
    iWantThisOperator(() => !this.theLatestObservableBooleanValue$)
).subscribe(() => {
    console.log('Stream Event')
})

“这不是实际代码,我只是想描述一下”

我希望可以处理的运算符根据最新的可观察流布尔值过滤我的 mainObservable 事件

所以只有当 LatestObservableBooleanValue$ 为 false 时,我才需要所有 mainObservable 事件

您可以使用 withLatestFrom 来引用来自另一个可观察源的最新发射值。然后,简单地 filter 基于其他值,然后 map 回到“主要”值:

this.mainObservable$.pipe(
    withLatestFrom(this.theLatestObservableBooleanValue$),
    filter(([main, boolean]) => !boolean),
    map(([main]) => main)
  ).subscribe(n => {
      console.log(`Stream Event ${n}`)
  })

这是一个有效的 StackBlitz

如果您想将其变成您自己的自定义运算符,它可能看起来像这样:

function iWantThisOperator(boolean$: Observable<boolean>) {
  return function<T>(source$: Observable<T>) {
    return source$.pipe(
      withLatestFrom(boolean$),
      filter(([, boolean]) => !boolean),
      map(([source]) => source)
    )
  }
}

mainObservable$.pipe(
    iWantThisOperator(theLatestObservableBooleanValue$)
).subscribe(() => {
    console.log('Stream Event')
})

switchMap运算符

如果您只需要一个(或两个)地方的解决方案,请使用此选项

const { Subject } = rxjs;
const { filter, switchMap } = rxjs.operators;

const condition$ = new Subject();
const main$ = new Subject();

const resultWithoutOperator$ = condition$.pipe(
  switchMap(condition => main$.pipe(filter(() => !condition)))
);

resultWithoutOperator$.subscribe(v =>
  console.log("#resultWithoutOperator$: ", v)
);

condition$.next(false);
main$.next(1);
condition$.next(true);
main$.next(2);
condition$.next(false);
main$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

自定义运算符

如果您多次需要您的行为,请使用此选项

const { Subject } = rxjs;
const { filter, switchMap } = rxjs.operators;

const condition$ = new Subject();
const main$ = new Subject();

function filterOperator(condition$) {
  return source$ => {
    return condition$.pipe(
      switchMap(condition => source$.pipe(filter(() => !condition)))
    );
  };
}

const resultWithOperator$ = main$.pipe(filterOperator(condition$));

resultWithOperator$.subscribe(v => console.log("#resultWithOperator$: ", v));

condition$.next(false);
main$.next(1);
condition$.next(true);
main$.next(2);
condition$.next(false);
main$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

参数为过滤函数的自定义算子

如果你多次需要你的行为并且想要硬核(或者想抽象过滤器功能),请使用这个

const { Subject } = rxjs;
const { filter, switchMap } = rxjs.operators;

const condition$ = new Subject();
const main$ = new Subject();

function filterOperator(condition$, fn) {
  return source$ => {
    return condition$.pipe(
      switchMap(condition => source$.pipe(filter(() => fn(condition))))
    );
  };
}

const resultWithOperator$ = main$.pipe(filterOperator(condition$, v => !v));
const resultWithOperatorReverse$ = main$.pipe(filterOperator(condition$, v => v));

resultWithOperator$.subscribe(v => console.log("#resultWithOperator$: ", v));
resultWithOperatorReverse$.subscribe(v => console.log("#resultWithOperatorReverse$: ", v));

condition$.next(false);
main$.next(1);
condition$.next(true);
main$.next(2);
condition$.next(false);
main$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>