RxSwift - 将序列拆分成更小的序列以连接
RxSwift - Split up sequence into smaller sequences to concat on
背景
我正在尝试使用 Range
header 通过块下载文件。
目标
我想将大量的 http 请求分成四个序列,然后我可以将它们连接起来一次处理 4 个请求。
当前排名
我目前正在处理我的序列并使用 concat
来确保第一个可观察请求在我开始第二个之前完成。这样做是为了确保我不会因太多请求使 Alamofire 过载,从而导致请求超时。
理想情况下,我想将我的序列分成四个相当相等的序列,因为 Alamofire 被设置为一次处理四个与主机的连接。我想这样做是因为我相信它会提高我的下载速度。
使用分块下载文件
Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: {[=10=] + self.defaultChunkSize})
.map( { (startChunk) in
let endChunk = startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize
return (startChunk, endChunk)
})
.map({ (startChunk: Int, endChunk: Int) -> Observable<FileChunkResult> in
self.filesClient.downloadChunkOf(fileId: file.id, startChunk: Int64(startChunk), endChunk: Int64(endChunk))
})
.concat() // <----- This is where I am forcing the large sequence to do one observable at a time
.flatMap( { (result: FileChunkResult) -> Observable<FileSaveChunkResult> in
switch (result) {
case FileChunkResult.success(let chunkData):
return self.saveChunkToFile(fileChunk: chunkData, location: urlToSaveTo)
case FileChunkResult.failure: // Maybe change this to just default and return Observable.just(FileSaveChunkResult.failure)
break
case FileChunkResult.parserError:
break
}
return Observable.just(FileSaveChunkResult.failure)
})
.flatMap( { (result: FileSaveChunkResult) -> Observable<Progress> in
switch (result) {
case FileSaveChunkResult.success(let bytesSaved):
progress.completedUnitCount += bytesSaved
case FileSaveChunkResult.failure:
break
}
return Observable.just(progress)
})
下面的代码会将块分成四个大小相等的数组,这些数组使用 concat 来确保每个数组中一次只有一个保存处于活动状态。这意味着您将始终有 4 saveChunkToFile
个调用在任何时刻处于活动状态,无论任何特定调用有多快或多慢。
也就是说,它立即启动四个请求,然后在前面的一个请求完成时再启动一个请求。
let generator = Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: { [=10=] + defaultChunkSize })
let chunks = generator.map( { (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
return (startChunk, endChunk)
})
let count = ceil(Double(fileSize) / Double(defaultChunkSize) / 4)
let requests = chunks.window(timeSpan: 0.0, count: Int(count), scheduler: MainScheduler.instance)
.flatMap { [=10=]
.map( { (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}).concat()
}
let downloadObservable = requests
.flatMap( { (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult {
progress.completedUnitCount += bytesSaved
}
return Observable.just(progress)
})
_ = downloadObservable.subscribe(onNext: { print(Date(), [=10=]) })
背景
我正在尝试使用 Range
header 通过块下载文件。
目标
我想将大量的 http 请求分成四个序列,然后我可以将它们连接起来一次处理 4 个请求。
当前排名
我目前正在处理我的序列并使用 concat
来确保第一个可观察请求在我开始第二个之前完成。这样做是为了确保我不会因太多请求使 Alamofire 过载,从而导致请求超时。
理想情况下,我想将我的序列分成四个相当相等的序列,因为 Alamofire 被设置为一次处理四个与主机的连接。我想这样做是因为我相信它会提高我的下载速度。
使用分块下载文件
Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: {[=10=] + self.defaultChunkSize})
.map( { (startChunk) in
let endChunk = startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize
return (startChunk, endChunk)
})
.map({ (startChunk: Int, endChunk: Int) -> Observable<FileChunkResult> in
self.filesClient.downloadChunkOf(fileId: file.id, startChunk: Int64(startChunk), endChunk: Int64(endChunk))
})
.concat() // <----- This is where I am forcing the large sequence to do one observable at a time
.flatMap( { (result: FileChunkResult) -> Observable<FileSaveChunkResult> in
switch (result) {
case FileChunkResult.success(let chunkData):
return self.saveChunkToFile(fileChunk: chunkData, location: urlToSaveTo)
case FileChunkResult.failure: // Maybe change this to just default and return Observable.just(FileSaveChunkResult.failure)
break
case FileChunkResult.parserError:
break
}
return Observable.just(FileSaveChunkResult.failure)
})
.flatMap( { (result: FileSaveChunkResult) -> Observable<Progress> in
switch (result) {
case FileSaveChunkResult.success(let bytesSaved):
progress.completedUnitCount += bytesSaved
case FileSaveChunkResult.failure:
break
}
return Observable.just(progress)
})
下面的代码会将块分成四个大小相等的数组,这些数组使用 concat 来确保每个数组中一次只有一个保存处于活动状态。这意味着您将始终有 4 saveChunkToFile
个调用在任何时刻处于活动状态,无论任何特定调用有多快或多慢。
也就是说,它立即启动四个请求,然后在前面的一个请求完成时再启动一个请求。
let generator = Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: { [=10=] + defaultChunkSize })
let chunks = generator.map( { (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
return (startChunk, endChunk)
})
let count = ceil(Double(fileSize) / Double(defaultChunkSize) / 4)
let requests = chunks.window(timeSpan: 0.0, count: Int(count), scheduler: MainScheduler.instance)
.flatMap { [=10=]
.map( { (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}).concat()
}
let downloadObservable = requests
.flatMap( { (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult {
progress.completedUnitCount += bytesSaved
}
return Observable.just(progress)
})
_ = downloadObservable.subscribe(onNext: { print(Date(), [=10=]) })