使用 TypeScript 将 RxJS 运算符组合成新的运算符
Combine RxJS operators into new operator using TypeScript
我经常发现自己将相同的运算符序列添加到可观察对象中,例如
observable$
.do(x => console.log('some text', x))
.publishReplay()
.refCount();
我正在寻找一种方法将这 3 个运算符组合成一个小型可重用运算符(例如 .cache('some text')
),我可以将其链接到任何可观察对象。我如何在 Typescript 中定义它,以便我可以导入 rxjs/Observable 和这个运算符,就像我使用 rxjs 运算符一样?
要实现您描述的运算符,请创建一个包含以下内容的 cache.ts
文件:
import { Observable } from "rxjs/Observable";
import "rxjs/add/operator/do";
import "rxjs/add/operator/publishReplay";
// Compose the operator:
function cache<T>(this: Observable<T>, text: string): Observable<T> {
return this
.do(x => console.log(text, x))
.publishReplay()
.refCount();
}
// Add the operator to the Observable prototype:
Observable.prototype.cache = cache;
// Extend the TypeScript interface for Observable to include the operator:
declare module "rxjs/Observable" {
interface Observable<T> {
cache: typeof cache;
}
}
然后这样消费:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/of";
import "./cache";
let cached = Observable.of(1).cache("some text");
cached.subscribe(x => console.log(x));
cartant 上面的回答很好,并回答了所问的问题(我如何在 Typescript 中定义它,以便我可以导入 rxjs/Observable 和这个运算符,就像我对 rxjs 运算符所做的那样?)
我最近发现了 let
运算符,如果您实际上不需要将函数实现为 operator, 仍然会让你干掉你的代码。
我开始实施 angular 2 服务来与我的 rails 后端交互,并且知道我的大多数 api 调用看起来非常相似,所以我想尝试将尽可能多的常用内容放入函数中。
几乎所有调用都会执行以下操作:
- 出现错误时重试(我下面的函数需要在这方面做更多的工作)
- 将 http 响应映射到本地定义的 typescript class(通过 json-typescript-mapper)
- 处理错误
这是我通过 rxjs let 运算符通过通用函数 (handleResponse) 使用 let
运算符对我的 http 响应的示例。
handleResponse<T>({klass, retries=0} :{klass:any,retries?:number }) : (source: Observable<Response>) => Observable<T> {
return (source: Observable<Response>) : Observable<T> => {
return source.retry(retries)
.map( (res) => this.processResponse(klass,res))
.catch( (res) => this.handleError(res));
}
}
processResponse(klass, response: Response) {
return deserialize(klass, response.json());
}
handleError(res: Response) {
const error = new RailsBackendError(res.status, res.statusText);
return Observable.throw(error);
}
getUserList({page=1,perPage=30,retry=0}: { page?:number, perPage?:number, retry?:number }={}) : Observable<UserList> {
const requestURL = `/api/v1/users/?${this.apiTokenQueryString}&page=${page}&per_page=${perPage}`;
return this.http.get(requestURL).let(this.handleResponse<UserList>({klass: UserList}));
}
我经常发现自己将相同的运算符序列添加到可观察对象中,例如
observable$
.do(x => console.log('some text', x))
.publishReplay()
.refCount();
我正在寻找一种方法将这 3 个运算符组合成一个小型可重用运算符(例如 .cache('some text')
),我可以将其链接到任何可观察对象。我如何在 Typescript 中定义它,以便我可以导入 rxjs/Observable 和这个运算符,就像我使用 rxjs 运算符一样?
要实现您描述的运算符,请创建一个包含以下内容的 cache.ts
文件:
import { Observable } from "rxjs/Observable";
import "rxjs/add/operator/do";
import "rxjs/add/operator/publishReplay";
// Compose the operator:
function cache<T>(this: Observable<T>, text: string): Observable<T> {
return this
.do(x => console.log(text, x))
.publishReplay()
.refCount();
}
// Add the operator to the Observable prototype:
Observable.prototype.cache = cache;
// Extend the TypeScript interface for Observable to include the operator:
declare module "rxjs/Observable" {
interface Observable<T> {
cache: typeof cache;
}
}
然后这样消费:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/of";
import "./cache";
let cached = Observable.of(1).cache("some text");
cached.subscribe(x => console.log(x));
cartant 上面的回答很好,并回答了所问的问题(我如何在 Typescript 中定义它,以便我可以导入 rxjs/Observable 和这个运算符,就像我对 rxjs 运算符所做的那样?)
我最近发现了 let
运算符,如果您实际上不需要将函数实现为 operator, 仍然会让你干掉你的代码。
我开始实施 angular 2 服务来与我的 rails 后端交互,并且知道我的大多数 api 调用看起来非常相似,所以我想尝试将尽可能多的常用内容放入函数中。
几乎所有调用都会执行以下操作:
- 出现错误时重试(我下面的函数需要在这方面做更多的工作)
- 将 http 响应映射到本地定义的 typescript class(通过 json-typescript-mapper)
- 处理错误
这是我通过 rxjs let 运算符通过通用函数 (handleResponse) 使用 let
运算符对我的 http 响应的示例。
handleResponse<T>({klass, retries=0} :{klass:any,retries?:number }) : (source: Observable<Response>) => Observable<T> {
return (source: Observable<Response>) : Observable<T> => {
return source.retry(retries)
.map( (res) => this.processResponse(klass,res))
.catch( (res) => this.handleError(res));
}
}
processResponse(klass, response: Response) {
return deserialize(klass, response.json());
}
handleError(res: Response) {
const error = new RailsBackendError(res.status, res.statusText);
return Observable.throw(error);
}
getUserList({page=1,perPage=30,retry=0}: { page?:number, perPage?:number, retry?:number }={}) : Observable<UserList> {
const requestURL = `/api/v1/users/?${this.apiTokenQueryString}&page=${page}&per_page=${perPage}`;
return this.http.get(requestURL).let(this.handleResponse<UserList>({klass: UserList}));
}