Angular 2 Http、Observables 和递归请求
Angular 2 Http, Observables and recursive requests
我有一个 REST 端点,它 return 是一个项目列表,一次最多 1000 个项目。如果有超过 1000 个项目,响应的 HTTP 状态为 206,并且有一个 Next-Range
header,我可以在下一个获取更多项目的请求中使用它。
我正在开发 Angular 2 应用程序 并尝试使用 Http
和 Observable
来实现它。我的问题是我不知道 如何合并多个 Observable
s 取决于有多少页的项目和 最后 return我的组件可以订阅一个 Observable
。
这是我当前 TypeScript 实现的位置:
// NOTE: Non-working example!
getAllItems(): Observable<any[]> {
// array of all items, possibly received with multiple requests
const allItems: any[] = [];
// inner function for getting a range of items
const getRange = (range?: string) => {
const headers: Headers = new Headers();
if (range) {
headers.set('Range', range);
}
return this.http.get('http://api/endpoint', { headers })
.map((res: Response) => {
// add all to received items
// (maybe not needed if the responses can be merged some other way?)
allItems.push.apply(allItems, res.json());
// partial content
if (res.status === 206) {
const nextRange = res.headers.get('Next-Range');
// get next range of items
return getRange(nextRange);
}
return allItems;
});
};
// get first range
return getRange();
}
但是,这不起作用。如果我理解正确,Observable
被 returned 作为初始值 Observable
而不是项目数组。
您可以使用扩展运算符来实现它。您真正想要做的是创建一个递归平面图。这正是创建运算符 expand 的目的。
这是其工作原理的代码片段:
let times = true;
// This is a mock method for your http.get call
const httpMock = () => {
if(times) {
times = false;
return Rx.Observable.of({items: ["1", "2", "3"], next: true});
} else {
return Rx.Observable.of({items: ["4", "5", "6"], next: false});
}
}
httpMock()
.expand(obj => {
// In your case, the obj will be the response
// implement your logic here if the 206 http header is found
if(obj.next) {
// If you have next values, just call the http.get method again
// In my example it's the httpMock
return httpMock();
} else {
return Rx.Observable.empty();
}
})
.map(obj => obj.items.flatMap(array => array))
.reduce((acc, x) => acc.concat(x), []);
.subscribe((val) => console.log(val));
所做的是模拟第一个 http 请求,该请求的 'next' 属性 为真。这与您的 206 header 匹配。然后我们进行第二次调用,其中 'next' 属性 为 false。
结果是一个包含两个请求结果的数组。由于扩展运算符,它也适用于更多请求。
可在此处找到有效的 jsbin 示例:http://jsbin.com/wowituluqu/edit?js,console
编辑:已更新以使用 http 调用,该调用 return 是数组中的数组,最终结果是包含数组中所有元素的单个数组。
如果您希望得到一个包含来自请求的单独数组的结果数组,只需删除平面图并直接 return 项目。在这里更新codepen:
http://codepen.io/anon/pen/xRZyaZ?editors=0010#0
我对 KwintenP 的示例做了一些小的调整:
// service.ts
getAllItems(): Observable<any[]> {
const getRange = (range?: string): Observable<any> => {
const headers: Headers = new Headers();
if (range) {
headers.set('Range', range);
}
return this.http.get('http://api/endpoint', { headers });
};
return getRange().expand((res: Response) => {
if (res.status === 206) {
const nextRange = res.headers.get('Next-Range');
return getRange(nextRange);
} else {
return Observable.empty();
}
}).map((res: Response) => res.json());
}
在订阅 Observable
的组件中,我必须添加一个完整的处理程序:
// component.ts
const temp = [];
service.getAllItems().subscribe(
items => {
// page received, push items to temp
temp.push.apply(temp, items);
},
err => {
// handle error
},
() => {
// completed, expose temp to component
this.items = temp;
}
);
上面的回答很有用。我不得不以递归方式使用分页 API 获取数据,并创建了 code snippet
计算阶乘。
在最新版本中,angular 6+(自行响应 returns JSON),RxJs 6+(以管道方式使用运算符)。
getAllItems(): Observable<any[]> {
const getRange = (range?: string): Observable<any> => {
const headers: Headers = new Headers();
if (range) {
headers.set('Range', range);
}
return this.http.get('http://api/endpoint', { headers });
};
return getRange().pipe(expand((res: Response) => {
if (res['status'] === 206) {
const nextRange = res['headers'].get('Next-Range');
return getRange(nextRange);
} else {
return EMPTY;
}
}));
}
以防万一其他人遇到这个问题。我正在使用的模式使用相同的扩展概念。然而,当您需要将来自服务器的响应转换为不同类型的 Observable
时,这确实是 'complete' 示例,例如上面的 Visa Kopu 示例。
我把每个 'step' 都分解了,所以流程是在方法中捕获的(而不是编写它的最紧凑版本)。我觉得这样比较好学
import {Injectable} from '@angular/core';
import {HttpClient, HttpParams, HttpResponse} from '@angular/common/http';
import {EMPTY, Observable} from 'rxjs';
import {expand, map} from 'rxjs/operators';
// this service is consuming a backend api that is calling/proxying a Salesforce query that is paginated
@Injectable({providedIn: 'root'})
export class ExampleAccountService {
constructor(protected http: HttpClient) {
}
// this method maps the 'pages' of AccountsResponse objects to a single Observable array of Account objects
allAccounts(): Observable<Account[]> {
const accounts: Account[] = [];
return this.aPageOfAccounts(null).pipe(
map((ret: HttpResponse<AccountsResponse>) => {
for (const account of ret.body.accounts) {
accounts.push(account);
}
return accounts;
})
);
}
// recursively fetch pages of accounts until there are no more pages
private aPageOfAccounts(page): Observable<HttpResponse<AccountsResponse>> {
return this.fetchAccountsFromServer(page).pipe(
expand((res: HttpResponse<AccountsResponse>) => {
if (res.body.nextRecordsUrl) {
return this.aPageOfAccounts(res.body.nextRecordsUrl);
} else {
return EMPTY;
}
}));
}
// this one does the actual fetch to the server
private fetchAccountsFromServer(page: string): Observable<HttpResponse<AccountsResponse>> {
const options = createRequestOption({page});
return this.http.get<AccountsResponse>(`https://wherever.com/accounts/page`,
{params: options, observe: 'response'});
}
}
export class AccountsResponse {
constructor(public totalSize?: number,
public done?: boolean,
public nextRecordsUrl?: string,
public accounts?: Account[]) {
}
}
export class Account {
constructor(public id?: string,
public name?: string
) {
}
}
export const createRequestOption = (req?: any): HttpParams => {
let options: HttpParams = new HttpParams();
if (req) {
Object.keys(req).forEach((key) => {
if (key !== 'sort') {
options = options.set(key, req[key]);
}
});
if (req.sort) {
req.sort.forEach((val) => {
options = options.append('sort', val);
});
}
}
return options;
};
我有一个 REST 端点,它 return 是一个项目列表,一次最多 1000 个项目。如果有超过 1000 个项目,响应的 HTTP 状态为 206,并且有一个 Next-Range
header,我可以在下一个获取更多项目的请求中使用它。
我正在开发 Angular 2 应用程序 并尝试使用 Http
和 Observable
来实现它。我的问题是我不知道 如何合并多个 Observable
s 取决于有多少页的项目和 最后 return我的组件可以订阅一个 Observable
。
这是我当前 TypeScript 实现的位置:
// NOTE: Non-working example!
getAllItems(): Observable<any[]> {
// array of all items, possibly received with multiple requests
const allItems: any[] = [];
// inner function for getting a range of items
const getRange = (range?: string) => {
const headers: Headers = new Headers();
if (range) {
headers.set('Range', range);
}
return this.http.get('http://api/endpoint', { headers })
.map((res: Response) => {
// add all to received items
// (maybe not needed if the responses can be merged some other way?)
allItems.push.apply(allItems, res.json());
// partial content
if (res.status === 206) {
const nextRange = res.headers.get('Next-Range');
// get next range of items
return getRange(nextRange);
}
return allItems;
});
};
// get first range
return getRange();
}
但是,这不起作用。如果我理解正确,Observable
被 returned 作为初始值 Observable
而不是项目数组。
您可以使用扩展运算符来实现它。您真正想要做的是创建一个递归平面图。这正是创建运算符 expand 的目的。
这是其工作原理的代码片段:
let times = true;
// This is a mock method for your http.get call
const httpMock = () => {
if(times) {
times = false;
return Rx.Observable.of({items: ["1", "2", "3"], next: true});
} else {
return Rx.Observable.of({items: ["4", "5", "6"], next: false});
}
}
httpMock()
.expand(obj => {
// In your case, the obj will be the response
// implement your logic here if the 206 http header is found
if(obj.next) {
// If you have next values, just call the http.get method again
// In my example it's the httpMock
return httpMock();
} else {
return Rx.Observable.empty();
}
})
.map(obj => obj.items.flatMap(array => array))
.reduce((acc, x) => acc.concat(x), []);
.subscribe((val) => console.log(val));
所做的是模拟第一个 http 请求,该请求的 'next' 属性 为真。这与您的 206 header 匹配。然后我们进行第二次调用,其中 'next' 属性 为 false。
结果是一个包含两个请求结果的数组。由于扩展运算符,它也适用于更多请求。
可在此处找到有效的 jsbin 示例:http://jsbin.com/wowituluqu/edit?js,console
编辑:已更新以使用 http 调用,该调用 return 是数组中的数组,最终结果是包含数组中所有元素的单个数组。
如果您希望得到一个包含来自请求的单独数组的结果数组,只需删除平面图并直接 return 项目。在这里更新codepen: http://codepen.io/anon/pen/xRZyaZ?editors=0010#0
我对 KwintenP 的示例做了一些小的调整:
// service.ts
getAllItems(): Observable<any[]> {
const getRange = (range?: string): Observable<any> => {
const headers: Headers = new Headers();
if (range) {
headers.set('Range', range);
}
return this.http.get('http://api/endpoint', { headers });
};
return getRange().expand((res: Response) => {
if (res.status === 206) {
const nextRange = res.headers.get('Next-Range');
return getRange(nextRange);
} else {
return Observable.empty();
}
}).map((res: Response) => res.json());
}
在订阅 Observable
的组件中,我必须添加一个完整的处理程序:
// component.ts
const temp = [];
service.getAllItems().subscribe(
items => {
// page received, push items to temp
temp.push.apply(temp, items);
},
err => {
// handle error
},
() => {
// completed, expose temp to component
this.items = temp;
}
);
上面的回答很有用。我不得不以递归方式使用分页 API 获取数据,并创建了 code snippet 计算阶乘。
在最新版本中,angular 6+(自行响应 returns JSON),RxJs 6+(以管道方式使用运算符)。
getAllItems(): Observable<any[]> {
const getRange = (range?: string): Observable<any> => {
const headers: Headers = new Headers();
if (range) {
headers.set('Range', range);
}
return this.http.get('http://api/endpoint', { headers });
};
return getRange().pipe(expand((res: Response) => {
if (res['status'] === 206) {
const nextRange = res['headers'].get('Next-Range');
return getRange(nextRange);
} else {
return EMPTY;
}
}));
}
以防万一其他人遇到这个问题。我正在使用的模式使用相同的扩展概念。然而,当您需要将来自服务器的响应转换为不同类型的 Observable
时,这确实是 'complete' 示例,例如上面的 Visa Kopu 示例。
我把每个 'step' 都分解了,所以流程是在方法中捕获的(而不是编写它的最紧凑版本)。我觉得这样比较好学
import {Injectable} from '@angular/core';
import {HttpClient, HttpParams, HttpResponse} from '@angular/common/http';
import {EMPTY, Observable} from 'rxjs';
import {expand, map} from 'rxjs/operators';
// this service is consuming a backend api that is calling/proxying a Salesforce query that is paginated
@Injectable({providedIn: 'root'})
export class ExampleAccountService {
constructor(protected http: HttpClient) {
}
// this method maps the 'pages' of AccountsResponse objects to a single Observable array of Account objects
allAccounts(): Observable<Account[]> {
const accounts: Account[] = [];
return this.aPageOfAccounts(null).pipe(
map((ret: HttpResponse<AccountsResponse>) => {
for (const account of ret.body.accounts) {
accounts.push(account);
}
return accounts;
})
);
}
// recursively fetch pages of accounts until there are no more pages
private aPageOfAccounts(page): Observable<HttpResponse<AccountsResponse>> {
return this.fetchAccountsFromServer(page).pipe(
expand((res: HttpResponse<AccountsResponse>) => {
if (res.body.nextRecordsUrl) {
return this.aPageOfAccounts(res.body.nextRecordsUrl);
} else {
return EMPTY;
}
}));
}
// this one does the actual fetch to the server
private fetchAccountsFromServer(page: string): Observable<HttpResponse<AccountsResponse>> {
const options = createRequestOption({page});
return this.http.get<AccountsResponse>(`https://wherever.com/accounts/page`,
{params: options, observe: 'response'});
}
}
export class AccountsResponse {
constructor(public totalSize?: number,
public done?: boolean,
public nextRecordsUrl?: string,
public accounts?: Account[]) {
}
}
export class Account {
constructor(public id?: string,
public name?: string
) {
}
}
export const createRequestOption = (req?: any): HttpParams => {
let options: HttpParams = new HttpParams();
if (req) {
Object.keys(req).forEach((key) => {
if (key !== 'sort') {
options = options.set(key, req[key]);
}
});
if (req.sort) {
req.sort.forEach((val) => {
options = options.append('sort', val);
});
}
}
return options;
};