rxjs 进行的限速 http 调用
Rate limiting http calls made by rxjs
我正在编写一项服务,人们可以在其中粘贴 Spotify 播放列表中的 urls,然后将播放列表导出到不同的服务中。对于粘贴在请求中的每个曲目 url 需要向 Spotify api.
此代码:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.flatMap(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
- 从 ITrackIdentifiers 列表中创建一个 observable
- 采用轨道标识符的 id 来创建字符串 (ids) 的可观察值
- 删除列表中的所有重复 ID
- 为 spotify 的每个 http 调用创建一个可观察对象(并捕获错误)
- 使用 flatmap
将所有这些 observables 的结果合并到一个流中
除了添加大量曲目外,这实际上工作正常。我的一个示例播放列表有超过 500 首曲目,因此立即进行了 500 次调用,浏览器需要处理它们/缓存中的 return 项,因此浏览器速度很慢并锁定并识别 return 的负载超过 api 调用限制时的错误数。
我希望同时只能调用 10 个 运行。 Merge with maxConcurrent set seems like the perfect solution as discussed on Whosebug.
这看起来像这样:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.map(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.merge(10)
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
但这就是行不通。在 chrome 网络调试器中,您可以看到所有同时进行的调用,并且大部分都在排队等待很长时间,直到它们失败。
为什么这不起作用?我还能如何解决这个问题?
现阶段项目Github checkin如下:
我设法让它按我想要的方式工作,但我仍然很好奇为什么合并不起作用。
这里构建了唯一 ID 列表,然后我们使用 concatMap 为每个 ID 创建一个 Observable,然后在移动到下一个项目之前等待延迟:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.concatMap( ( id, index ) => Rx.Observable.interval( 50 ).take( 1 ).map( () => { return id } ) )
.flatMap(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
在此示例中,我在每次调用之间等待 50 毫秒。这大大减少了错误。
这是Github checkin现阶段的项目。
您的代码使用 merge
的问题是 spotifyService.lookupTrack
不是 return Observable
,而是 Promise
。正如 user3743222 所建议的那样,Observable
的一些功能如 flatMap
将 handle Promise
s as well, but the difference between an Observable
and a Promise
is that the Observable
is lazy, while the Promise
is not. You can make a lazy observable from a promise factory function using Observable.defer
。这个小例子是在 JavaScript 而不是 TypeScript 中,所以它可以是 运行 在这里。
console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p); window.scrollTo(0, b.scrollHeight); };
function log_delay(timeout, value) {
return new Promise(resolve => {
console.log('Start: ' + value);
setTimeout(() => {
console.log('End: ' + value);
resolve(value);
}, timeout);
});
}
Rx.Observable.range(0, 6)
.map(x => Rx.Observable.defer(
() => log_delay(1000, x)
.catch(e => console.log('Inner catch'))
))
.merge(2)
.subscribe(
s => console.log('Result: ' + s),
s => console.log('Error: ' + s),
s => console.log('Complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
我正在编写一项服务,人们可以在其中粘贴 Spotify 播放列表中的 urls,然后将播放列表导出到不同的服务中。对于粘贴在请求中的每个曲目 url 需要向 Spotify api.
此代码:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.flatMap(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
- 从 ITrackIdentifiers 列表中创建一个 observable
- 采用轨道标识符的 id 来创建字符串 (ids) 的可观察值
- 删除列表中的所有重复 ID
- 为 spotify 的每个 http 调用创建一个可观察对象(并捕获错误)
- 使用 flatmap 将所有这些 observables 的结果合并到一个流中
除了添加大量曲目外,这实际上工作正常。我的一个示例播放列表有超过 500 首曲目,因此立即进行了 500 次调用,浏览器需要处理它们/缓存中的 return 项,因此浏览器速度很慢并锁定并识别 return 的负载超过 api 调用限制时的错误数。
我希望同时只能调用 10 个 运行。 Merge with maxConcurrent set seems like the perfect solution as discussed on Whosebug.
这看起来像这样:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.map(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.merge(10)
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
但这就是行不通。在 chrome 网络调试器中,您可以看到所有同时进行的调用,并且大部分都在排队等待很长时间,直到它们失败。
为什么这不起作用?我还能如何解决这个问题?
现阶段项目Github checkin如下:
我设法让它按我想要的方式工作,但我仍然很好奇为什么合并不起作用。 这里构建了唯一 ID 列表,然后我们使用 concatMap 为每个 ID 创建一个 Observable,然后在移动到下一个项目之前等待延迟:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.concatMap( ( id, index ) => Rx.Observable.interval( 50 ).take( 1 ).map( () => { return id } ) )
.flatMap(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
在此示例中,我在每次调用之间等待 50 毫秒。这大大减少了错误。
这是Github checkin现阶段的项目。
您的代码使用 merge
的问题是 spotifyService.lookupTrack
不是 return Observable
,而是 Promise
。正如 user3743222 所建议的那样,Observable
的一些功能如 flatMap
将 handle Promise
s as well, but the difference between an Observable
and a Promise
is that the Observable
is lazy, while the Promise
is not. You can make a lazy observable from a promise factory function using Observable.defer
。这个小例子是在 JavaScript 而不是 TypeScript 中,所以它可以是 运行 在这里。
console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p); window.scrollTo(0, b.scrollHeight); };
function log_delay(timeout, value) {
return new Promise(resolve => {
console.log('Start: ' + value);
setTimeout(() => {
console.log('End: ' + value);
resolve(value);
}, timeout);
});
}
Rx.Observable.range(0, 6)
.map(x => Rx.Observable.defer(
() => log_delay(1000, x)
.catch(e => console.log('Inner catch'))
))
.merge(2)
.subscribe(
s => console.log('Result: ' + s),
s => console.log('Error: ' + s),
s => console.log('Complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>