使用 RxJS 一次限制请求数

Limit number of requests at a time with RxJS

假设我有 10 个 url,我想为每个 url 发出一个 HTTP 请求。

我可以创建 URL 的可观察对象,然后 .flatMap() 每个请求的请求,然后 .subscribe 的结果。但这会同时发出所有请求。

有没有办法将请求数限制在固定数量,以免服务器超载

这个问题在这里有答案:how-to-limit-the-concurrency-of-flatmap 您也可以在此处查看答案

基本上它围绕使用 merge(withMaxConcurrency) 运算符。

我遇到了同样的问题,现在找到了解决办法。 参考

如果你通过 fromNodeCallback 或 return Promise 创建 observable,它会创建一个热 observable,并且一旦 flatMap 和订阅就会立即执行。所以 flatMapWithMaxConcurrent 或 map&merge 没有按预期工作。

var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
 console.log(i, 'access', now());
 setTimeout(function(){
  cb(null, i)
 }, 1000);
})

Rx.Observable.from([1,2,3])
 .flatMapWithMaxConcurrent(1, x=>test(x) )
 .subscribe(x=>console.log(x, 'finish', now()))

/* output:
1 access 2
2 access 16
3 access 16
1 finish 1016
2 finish 1016
3 finish 1017
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>

您需要将其转换为冷可观察对象,只需使用 Rx.Observable.defer。

var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
 console.log(i, 'access', now());
 setTimeout(function(){
  cb(null, i)
 }, 1000);
})


Rx.Observable.from([1,2,3])
 .flatMapWithMaxConcurrent(1, x=>Rx.Observable.defer( ()=>test(x)) )
// .map(x=>Rx.Observable.defer( ()=>test(x)) ).merge(1) // this works too
 .subscribe(x=>console.log(x, 'finish', now()))
    
/* output:
1 access 3
1 finish 1004
2 access 1005
2 finish 2005
3 access 2006
3 finish 3006
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>

这里只是一个旁注:在 RxJS v5 中,fromCallback()fromNodeCallback() 分别更改为 bindCallback()bindNodeCallback()。 Link 进一步阅读:bindNodeCallback

现在是 2018 年,rxjs 5 来了,我就是这样解决的

urls$
  .mergeMap((url) => request({ url }), null, 10)
  .subscribe()

mergeMap(又名 flatMap)已经将 "max concurrency" 作为其第三个参数(参见 docs

顺便说一句。我正在使用 universal-rxjs-ajax(request)来实现节点兼容性,但它应该与 Observable.ajax

一样工作

RxJS v6 更新

将您的并行限制作为第二个参数通过 mergeMap 进行管道处理

const MAX_PARALLEL_QUERIES = 3;
let allResults = [];
let observables = [] // fill with observables
from(observables)
            .pipe(mergeMap(observable => observable, MAX_PARALLEL_QUERIES))
            .subscribe(
                partialResults => {
                    allResults = allResults.concat(partialResults);
                },
                err => {
                    // handle error
                },
                () => {
                    // get here when all obserable has returned
                    allResults.forEach(result=> {
                        // do what you want
                    });
                }
            );