rxjs 进行的限速 http 调用

Rate limiting http calls made by rxjs

我正在编写一项服务,人们可以在其中粘贴 Spotify 播放列表中的 urls,然后将播放列表导出到不同的服务中。对于粘贴在请求中的每个曲目 url 需要向 Spotify api.

此代码:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .flatMap(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                    catch( ( error ) => this.handleError( error ) ))
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );
  1. 从 ITrackIdentifiers 列表中创建一个 observable
  2. 采用轨道标识符的 id 来创建字符串 (ids) 的可观察值
  3. 删除列表中的所有重复 ID
  4. 为 spotify 的每个 http 调用创建一个可观察对象(并捕获错误)
  5. 使用 flatmap
  6. 将所有这些 observables 的结果合并到一个流中

除了添加大量曲目外,这实际上工作正常。我的一个示例播放列表有超过 500 首曲目,因此立即进行了 500 次调用,浏览器需要处理它们/缓存中的 return 项,因此浏览器速度很慢并锁定并识别 return 的负载超过 api 调用限制时的错误数。

我希望同时只能调用 10 个 运行。 Merge with maxConcurrent set seems like the perfect solution as discussed on Whosebug.

这看起来像这样:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .map(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                    catch( ( error ) => this.handleError( error ) ))
            .merge(10)
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );

但这就是行不通。在 chrome 网络调试器中,您可以看到所有同时进行的调用,并且大部分都在排队等待很长时间,直到它们失败。

为什么这不起作用?我还能如何解决这个问题?

现阶段项目Github checkin如下:

我设法让它按我想要的方式工作,但我仍然很好奇为什么合并不起作用。 这里构建了唯一 ID 列表,然后我们使用 concatMap 为每个 ID 创建一个 Observable,然后在移动到下一个项目之前等待延迟:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .concatMap( ( id, index ) => Rx.Observable.interval( 50 ).take( 1 ).map( () => { return id } ) )
            .flatMap(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                catch( ( error ) => this.handleError( error ) ))
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );

在此示例中,我在每次调用之间等待 50 毫秒。这大大减少了错误。

这是Github checkin现阶段的项目。

您的代码使用 merge 的问题是 spotifyService.lookupTrack 不是 return Observable,而是 Promise。正如 user3743222 所建议的那样,Observable 的一些功能如 flatMaphandle Promises as well, but the difference between an Observable and a Promise is that the Observable is lazy, while the Promise is not. You can make a lazy observable from a promise factory function using Observable.defer。这个小例子是在 JavaScript 而不是 TypeScript 中,所以它可以是 运行 在这里。

console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p);  window.scrollTo(0, b.scrollHeight); };

function log_delay(timeout, value) {
  return new Promise(resolve => {
    console.log('Start: ' + value);
    setTimeout(() => {
      console.log('End: ' + value);
      resolve(value);
    }, timeout);
  });
}

Rx.Observable.range(0, 6)
.map(x => Rx.Observable.defer(
  () => log_delay(1000, x)
  .catch(e => console.log('Inner catch'))
))
.merge(2)
.subscribe(
  s => console.log('Result: ' + s),
  s => console.log('Error: ' + s),
  s => console.log('Complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>