使用 RxJS 一次限制请求数
Limit number of requests at a time with RxJS
假设我有 10 个 url,我想为每个 url 发出一个 HTTP 请求。
我可以创建 URL 的可观察对象,然后 .flatMap()
每个请求的请求,然后 .subscribe
的结果。但这会同时发出所有请求。
有没有办法将请求数限制在固定数量,以免服务器超载
这个问题在这里有答案:how-to-limit-the-concurrency-of-flatmap
您也可以在此处查看答案
基本上它围绕使用 merge(withMaxConcurrency)
运算符。
我遇到了同样的问题,现在找到了解决办法。
参考
如果你通过 fromNodeCallback 或 return Promise 创建 observable,它会创建一个热 observable,并且一旦 flatMap 和订阅就会立即执行。所以 flatMapWithMaxConcurrent 或 map&merge 没有按预期工作。
var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
console.log(i, 'access', now());
setTimeout(function(){
cb(null, i)
}, 1000);
})
Rx.Observable.from([1,2,3])
.flatMapWithMaxConcurrent(1, x=>test(x) )
.subscribe(x=>console.log(x, 'finish', now()))
/* output:
1 access 2
2 access 16
3 access 16
1 finish 1016
2 finish 1016
3 finish 1017
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>
您需要将其转换为冷可观察对象,只需使用 Rx.Observable.defer。
var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
console.log(i, 'access', now());
setTimeout(function(){
cb(null, i)
}, 1000);
})
Rx.Observable.from([1,2,3])
.flatMapWithMaxConcurrent(1, x=>Rx.Observable.defer( ()=>test(x)) )
// .map(x=>Rx.Observable.defer( ()=>test(x)) ).merge(1) // this works too
.subscribe(x=>console.log(x, 'finish', now()))
/* output:
1 access 3
1 finish 1004
2 access 1005
2 finish 2005
3 access 2006
3 finish 3006
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>
这里只是一个旁注:在 RxJS v5 中,fromCallback()
和 fromNodeCallback()
分别更改为 bindCallback()
和 bindNodeCallback()
。 Link 进一步阅读:bindNodeCallback
现在是 2018 年,rxjs 5 来了,我就是这样解决的
urls$
.mergeMap((url) => request({ url }), null, 10)
.subscribe()
mergeMap
(又名 flatMap
)已经将 "max concurrency" 作为其第三个参数(参见 docs)
顺便说一句。我正在使用 universal-rxjs-ajax(request
)来实现节点兼容性,但它应该与 Observable.ajax
一样工作
RxJS v6 更新
将您的并行限制作为第二个参数通过 mergeMap 进行管道处理
const MAX_PARALLEL_QUERIES = 3;
let allResults = [];
let observables = [] // fill with observables
from(observables)
.pipe(mergeMap(observable => observable, MAX_PARALLEL_QUERIES))
.subscribe(
partialResults => {
allResults = allResults.concat(partialResults);
},
err => {
// handle error
},
() => {
// get here when all obserable has returned
allResults.forEach(result=> {
// do what you want
});
}
);
假设我有 10 个 url,我想为每个 url 发出一个 HTTP 请求。
我可以创建 URL 的可观察对象,然后 .flatMap()
每个请求的请求,然后 .subscribe
的结果。但这会同时发出所有请求。
有没有办法将请求数限制在固定数量,以免服务器超载
这个问题在这里有答案:how-to-limit-the-concurrency-of-flatmap
您也可以在此处查看答案
基本上它围绕使用 merge(withMaxConcurrency)
运算符。
我遇到了同样的问题,现在找到了解决办法。
参考
如果你通过 fromNodeCallback 或 return Promise 创建 observable,它会创建一个热 observable,并且一旦 flatMap 和订阅就会立即执行。所以 flatMapWithMaxConcurrent 或 map&merge 没有按预期工作。
var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
console.log(i, 'access', now());
setTimeout(function(){
cb(null, i)
}, 1000);
})
Rx.Observable.from([1,2,3])
.flatMapWithMaxConcurrent(1, x=>test(x) )
.subscribe(x=>console.log(x, 'finish', now()))
/* output:
1 access 2
2 access 16
3 access 16
1 finish 1016
2 finish 1016
3 finish 1017
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>
您需要将其转换为冷可观察对象,只需使用 Rx.Observable.defer。
var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
console.log(i, 'access', now());
setTimeout(function(){
cb(null, i)
}, 1000);
})
Rx.Observable.from([1,2,3])
.flatMapWithMaxConcurrent(1, x=>Rx.Observable.defer( ()=>test(x)) )
// .map(x=>Rx.Observable.defer( ()=>test(x)) ).merge(1) // this works too
.subscribe(x=>console.log(x, 'finish', now()))
/* output:
1 access 3
1 finish 1004
2 access 1005
2 finish 2005
3 access 2006
3 finish 3006
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>
这里只是一个旁注:在 RxJS v5 中,fromCallback()
和 fromNodeCallback()
分别更改为 bindCallback()
和 bindNodeCallback()
。 Link 进一步阅读:bindNodeCallback
现在是 2018 年,rxjs 5 来了,我就是这样解决的
urls$
.mergeMap((url) => request({ url }), null, 10)
.subscribe()
mergeMap
(又名 flatMap
)已经将 "max concurrency" 作为其第三个参数(参见 docs)
顺便说一句。我正在使用 universal-rxjs-ajax(request
)来实现节点兼容性,但它应该与 Observable.ajax
RxJS v6 更新
将您的并行限制作为第二个参数通过 mergeMap 进行管道处理
const MAX_PARALLEL_QUERIES = 3;
let allResults = [];
let observables = [] // fill with observables
from(observables)
.pipe(mergeMap(observable => observable, MAX_PARALLEL_QUERIES))
.subscribe(
partialResults => {
allResults = allResults.concat(partialResults);
},
err => {
// handle error
},
() => {
// get here when all obserable has returned
allResults.forEach(result=> {
// do what you want
});
}
);