Angular 2 / RXJS - 需要一些帮助批处理请求
Angular 2 / RXJS - need some help batching requests
我一直在阅读 rxjs 文档,但在所有运算符中迷路了..
这是我目前得到的
let obs = Observable.from([1, 3, 5])
所以我需要做的是 take()
数组中的一些固定数量。在 post 请求中使用结果,当结果成功时,我需要重新启动该过程。我想收集所有的结果,并在进行过程中保持进度(用于进度条)
我不需要所有这些的代码。我真正需要知道的是如何使用 rxjs 拆分这个数组。发送其中的一部分,然后重新启动该过程,直到没有任何东西可以发送。
最终解决方案
var _this = this
function productsRequest(arr) {
return _this.chainableRequest('post', `reports/${clientId}/${retailerId}/`, loadedProductsReport, {
'identifiers': arr,
'realTime': true
})
}
let arrayCount = Math.ceil(identifiers.length/10)
let obs = Observable.from(identifiers)
.bufferCount(10)
.concatMap(arr => {
arrayCount--
return arrayCount > 0 ? productsRequest(arr) : Observable.empty()
})
let subscriber = obs.subscribe(
value => console.log(value)
)
父级中的可链接请求方法
chainableRequest(method: string, endpoint: string, action: Function, data = {}, callback?: Function){
let body = (<any>Object).assign({}, {
headers: this.headers
}, data)
return this._http[method.toLowerCase()](`${this.baseUri}/${endpoint}`, body, body)
.map((res: Response) => res.json())
}
这在很大程度上取决于您要实现的目标。
如果你想基于之前的某个 Observable 递归调用一个 Observable 并且你不知道你要调用它多少次然后使用 expand()
运算符。
例如,此演示根据上一次调用 (count
属性) 的响应递归创建 5 个请求:
import { Observable } from 'rxjs/Observable';
function mockPostRequest(count) {
return Observable.of(`{"count":${count},"data":"response"}`)
.map(val => JSON.parse(val));
}
Observable.of({count: 0})
.expand(response => {
console.log('Response:', response.count);
return response.count < 5 ? mockPostRequest(response.count + 1) : Observable.empty();
})
.subscribe(undefined, undefined, val => console.log('Completed'));
打印到控制台:
Response: 0
Response: 1
Response: 2
Response: 3
Response: 4
Response: 5
Completed
观看现场演示:http://plnkr.co/edit/lKNdR8oeOuB2mrnR3ahQ?p=preview
或者如果你只是想一个接一个地调用一堆HTTP请求(concatMap()
operator) or call all of them at once and consume them as they arrive (mergeMap()
运算符):
Observable.from([
'https://httpbin.org/get?1',
'https://httpbin.org/get?2',
'https://httpbin.org/get?3',
])
.concatMap(url => Observable.of(url))
.subscribe(response => console.log(response));
打印到控制台:
https://httpbin.org/get?1
https://httpbin.org/get?2
https://httpbin.org/get?3
我一直在阅读 rxjs 文档,但在所有运算符中迷路了..
这是我目前得到的
let obs = Observable.from([1, 3, 5])
所以我需要做的是 take()
数组中的一些固定数量。在 post 请求中使用结果,当结果成功时,我需要重新启动该过程。我想收集所有的结果,并在进行过程中保持进度(用于进度条)
我不需要所有这些的代码。我真正需要知道的是如何使用 rxjs 拆分这个数组。发送其中的一部分,然后重新启动该过程,直到没有任何东西可以发送。
最终解决方案
var _this = this
function productsRequest(arr) {
return _this.chainableRequest('post', `reports/${clientId}/${retailerId}/`, loadedProductsReport, {
'identifiers': arr,
'realTime': true
})
}
let arrayCount = Math.ceil(identifiers.length/10)
let obs = Observable.from(identifiers)
.bufferCount(10)
.concatMap(arr => {
arrayCount--
return arrayCount > 0 ? productsRequest(arr) : Observable.empty()
})
let subscriber = obs.subscribe(
value => console.log(value)
)
父级中的可链接请求方法
chainableRequest(method: string, endpoint: string, action: Function, data = {}, callback?: Function){
let body = (<any>Object).assign({}, {
headers: this.headers
}, data)
return this._http[method.toLowerCase()](`${this.baseUri}/${endpoint}`, body, body)
.map((res: Response) => res.json())
}
这在很大程度上取决于您要实现的目标。
如果你想基于之前的某个 Observable 递归调用一个 Observable 并且你不知道你要调用它多少次然后使用 expand()
运算符。
例如,此演示根据上一次调用 (count
属性) 的响应递归创建 5 个请求:
import { Observable } from 'rxjs/Observable';
function mockPostRequest(count) {
return Observable.of(`{"count":${count},"data":"response"}`)
.map(val => JSON.parse(val));
}
Observable.of({count: 0})
.expand(response => {
console.log('Response:', response.count);
return response.count < 5 ? mockPostRequest(response.count + 1) : Observable.empty();
})
.subscribe(undefined, undefined, val => console.log('Completed'));
打印到控制台:
Response: 0
Response: 1
Response: 2
Response: 3
Response: 4
Response: 5
Completed
观看现场演示:http://plnkr.co/edit/lKNdR8oeOuB2mrnR3ahQ?p=preview
或者如果你只是想一个接一个地调用一堆HTTP请求(concatMap()
operator) or call all of them at once and consume them as they arrive (mergeMap()
运算符):
Observable.from([
'https://httpbin.org/get?1',
'https://httpbin.org/get?2',
'https://httpbin.org/get?3',
])
.concatMap(url => Observable.of(url))
.subscribe(response => console.log(response));
打印到控制台:
https://httpbin.org/get?1
https://httpbin.org/get?2
https://httpbin.org/get?3