将数组传播到参数中,似乎不适用于 Observable.pipe(...myOperatorsArray)

Spreading arrays into arguments, doesn't seem to work for Observable.pipe(...myOperatorsArray)

我们创造

ob.pipe(map(),map(),map())

一直以来,应该可以

let myMaps = [map(),map(),map()]

然后稍后调用

ob.pipe(myMaps)

不知何故。

这是我在打字稿中的用例的简化版本

declare type InterceptorFunc = (m: Message) => Observable < Message >

  export function interceptorMap(interceptorFunc: InterceptorFunc):
  (source: Observable < Message > ) => Observable < Message > {
    console.log("interceptorMap") //[1]never see this in the console
    return (source: Observable < Message > ) => source.pipe(flatMap(e => {
      console.log("interceptorMap: msg: ", e) //[2]nor this
      return interceptorFunc(e);
    }));
  }

addInterceptor(interceptor: Function /*InterceptorFunc*/ ) {
  this.interceptorFuncArray.push( < InterceptorFunc > interceptor)
}

/**
 * Apply interceptors
 */
//message intercept

( < any > this.preReplayEventsSubject.asObservable()).pipe(
    ...(this.interceptorFuncArray.map(f => interceptorMap(f))),
    map((m: Message) => {
      console.log("Message after apply interceptors: ", m) //[3]see this in the console and the message appears like it has never flowed through any interceptor Funcs in the array
      return m;
    }))
  .subscribe((m: Message) => {//[4]
    this.replayEventsSubject.next(m);
  });

所以在注释 [1] 和 [2] 中,自定义运算符中的 console.log 语句永远不会执行。在注释 [3] 中,在运算符中更改的 Message 对象未更改,但执行此 console.log 语句的事实表明事件必须通过传播数组才能到达那里。

我尝试定义一个 returns null 的 InterceptorFunc,然后将其插入过滤器(e => !!)注释 [3] 和 [4] 代码仍然在具有 null 事件的过滤器应该运行的地方运行完全停止该 Message 对象的流程。

谁能解释为什么会这样?

我想重要的是你可以 give customized a functionpipeoperator :

obs.pipe(
 (observable) => doSomething(observable, ...),
 otherOperators(callbackFunction)
)

这是一个极简主义的代码示例,可以回答应该适合您的问题:

// import rxjs
const {of} = rxjs;
const {map} = rxjs.operators;


/*
  apply a chain of `map` operations
*/
function mapAll(obs, operations) {
  return obs.pipe(
    map(v => {
      return operations.reduce((accVal, currentOp) => {
        return currentOp(accVal)
      }, v)
    })
  )
}


let result = [];
let observable = of(result);

// The idea here is to verify that each operation pushes into result
let op1 = (r) => {r.push(1); return r};
let op2 = (r) => {r.push(2); return r};
let op3 = (r) => {r.push(3); return r};

let operations = [op1, op2, op3];

observable.pipe(
  (obs) => mapAll(obs, operations)
)
  .subscribe(res => console.log(res));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.1/rxjs.umd.js"></script>

来自 RxJs < 5.5

let 运算符已被替换为 pipe,它们具有相同的签名。我想您可以通过 `let̀ 找到更多关于如何编写自定义运算符的 examples