使用 TypeScript 将 RxJS 运算符组合成新的运算符

Combine RxJS operators into new operator using TypeScript

我经常发现自己将相同的运算符序列添加到可观察对象中,例如

observable$
  .do(x => console.log('some text', x))
  .publishReplay()
  .refCount();

我正在寻找一种方法将这 3 个运算符组合成一个小型可重用运算符(例如 .cache('some text')),我可以将其链接到任何可观察对象。我如何在 Typescript 中定义它,以便我可以导入 rxjs/Observable 和这个运算符,就像我使用 rxjs 运算符一样?

要实现您描述的运算符,请创建一个包含以下内容的 cache.ts 文件:

import { Observable } from "rxjs/Observable";
import "rxjs/add/operator/do";
import "rxjs/add/operator/publishReplay";

// Compose the operator:

function cache<T>(this: Observable<T>, text: string): Observable<T> {
  return this
    .do(x => console.log(text, x))
    .publishReplay()
    .refCount();
}

// Add the operator to the Observable prototype:

Observable.prototype.cache = cache;

// Extend the TypeScript interface for Observable to include the operator:

declare module "rxjs/Observable" {
  interface Observable<T> {
    cache: typeof cache;
  }
}

然后这样消费:

import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/of";
import "./cache";

let cached = Observable.of(1).cache("some text");
cached.subscribe(x => console.log(x));

cartant 上面的回答很好,并回答了所问的问题(我如何在 Typescript 中定义它,以便我可以导入 rxjs/Observable 和这个运算符,就像我对 rxjs 运算符所做的那样?)

我最近发现了 let 运算符,如果您实际上不需要将函数实现为 operator, 仍然会让你干掉你的代码。

我开始实施 angular 2 服务来与我的 rails 后端交互,并且知道我的大多数 api 调用看起来非常相似,所以我想尝试将尽可能多的常用内容放入函数中。

几乎所有调用都会执行以下操作:

  1. 出现错误时重试(我下面的函数需要在这方面做更多的工作)
  2. 将 http 响应映射到本地定义的 typescript class(通过 json-typescript-mapper
  3. 处理错误

这是我通过 rxjs let 运算符通过通用函数 (handleResponse) 使用 let 运算符对我的 http 响应的示例。

  handleResponse<T>({klass, retries=0} :{klass:any,retries?:number }) : (source: Observable<Response>) => Observable<T> {
    return (source: Observable<Response>)  : Observable<T> => {
      return source.retry(retries)
            .map( (res) => this.processResponse(klass,res))
            .catch( (res) => this.handleError(res));
    } 
  } 

  processResponse(klass, response: Response) {
    return deserialize(klass, response.json());
  }

  handleError(res: Response) {
    const error = new RailsBackendError(res.status, res.statusText);
    return  Observable.throw(error);
  }

  getUserList({page=1,perPage=30,retry=0}: { page?:number, perPage?:number, retry?:number }={}) : Observable<UserList> {
    const requestURL = `/api/v1/users/?${this.apiTokenQueryString}&page=${page}&per_page=${perPage}`;
    return this.http.get(requestURL).let(this.handleResponse<UserList>({klass: UserList}));
  }