RxSwift - Observable.generate - 使用附加映射处理顺序请求
RxSwift - Observable.generate - Handle sequential requests with additional mapping
目标
我正在尝试实现一项使用范围 header 下载文件的服务。这允许我一次下载一个文件块。
实施 - Observable.generate()
为了为每个请求创建一个可观察对象并保存我尝试使用的文件:
let downloadObservable = Observable.generate(initialState: 0, condition: { [=11=] < fileSize }, iterate: { [=11=] + self.defaultChunkSize })
这似乎很管用!除了较大的文件外,它似乎有错误。我的请求被取消了。调试后,我发现我的工作流程并没有按照我的预期运行。这是附加到上面一行的其余工作流程。
.map( { (startChunk) -> (Int64, Int64) in
// I determine the end chunk so I can download any size file in chunks of size X
let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
return (startChunk, endChunk)
}).flatMap( { [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
// I make the request via alamofire - UNEXPECTED FLOW HERE SEE NOTE #1
return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}).flatMap( { [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
// Upon receiving chunk response save to file
return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
// Update progress if successful
switch (saveResult) {
case .success(let bytesSaved):
progress.completedUnitCount += bytesSaved
case .failure:
break
}
return Observable.just(progress)
})
注#1
当我 运行 并调试它时,我的第一个 flatMap
循环直到发出 ALL 块请求。我期望这会更加顺序,我们将生成一个可观察对象,然后通过 flatMap
进行所有转换,然后循环回到开头。
这不是我应该实施的方式吗?
我需要在 Observable.generate()
上用 merge()
施展魔法吗?
我想我已经找到了这个问题的答案。关键是 map
通过网络请求,然后 concat
它们。这样做而不是使用 flatMap
。 concat
运算符将等到请求发送 onCompleted 后再开始下一个请求。代码如下:
let downloadObservable = Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: { [=10=] + self.defaultChunkSize })
.map( { (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
return (startChunk, endChunk)
}).map( { [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}).concat()
.flatMap( { [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult {
progress.completedUnitCount += bytesSaved
}
return Observable.just(progress)
})
我想出如何将它分成 4 个批次。我将它展开一点并在代码中添加注释以提供帮助:
let generator = Observable.generate(initialState: 0, condition: { [=11=] < fileSize }, iterate: { [=11=] + defaultChunkSize })
let chunks = generator.map( { (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
return (startChunk, endChunk)
})
let requests = chunks.buffer(timeSpan: 0.0, count: 4, scheduler: MainScheduler.instance)// makes batches of four item arrays.
.map { (batch) -> Observable<FileChunk> in
let requests = Observable.from(batch) // spreads the four items back out.
return requests.flatMap( { (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}) // start the four requests as normal.
}.concat() // wait until the four requests are finished before allowing the next four to begin.
let downloadObservable = requests
.flatMap( { (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult {
progress.completedUnitCount += bytesSaved
}
return Observable.just(progress)
})
目标
我正在尝试实现一项使用范围 header 下载文件的服务。这允许我一次下载一个文件块。
实施 - Observable.generate()
为了为每个请求创建一个可观察对象并保存我尝试使用的文件:
let downloadObservable = Observable.generate(initialState: 0, condition: { [=11=] < fileSize }, iterate: { [=11=] + self.defaultChunkSize })
这似乎很管用!除了较大的文件外,它似乎有错误。我的请求被取消了。调试后,我发现我的工作流程并没有按照我的预期运行。这是附加到上面一行的其余工作流程。
.map( { (startChunk) -> (Int64, Int64) in
// I determine the end chunk so I can download any size file in chunks of size X
let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
return (startChunk, endChunk)
}).flatMap( { [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
// I make the request via alamofire - UNEXPECTED FLOW HERE SEE NOTE #1
return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}).flatMap( { [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
// Upon receiving chunk response save to file
return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
// Update progress if successful
switch (saveResult) {
case .success(let bytesSaved):
progress.completedUnitCount += bytesSaved
case .failure:
break
}
return Observable.just(progress)
})
注#1
当我 运行 并调试它时,我的第一个 flatMap
循环直到发出 ALL 块请求。我期望这会更加顺序,我们将生成一个可观察对象,然后通过 flatMap
进行所有转换,然后循环回到开头。
这不是我应该实施的方式吗?
我需要在 Observable.generate()
上用 merge()
施展魔法吗?
我想我已经找到了这个问题的答案。关键是 map
通过网络请求,然后 concat
它们。这样做而不是使用 flatMap
。 concat
运算符将等到请求发送 onCompleted 后再开始下一个请求。代码如下:
let downloadObservable = Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: { [=10=] + self.defaultChunkSize })
.map( { (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
return (startChunk, endChunk)
}).map( { [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}).concat()
.flatMap( { [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult {
progress.completedUnitCount += bytesSaved
}
return Observable.just(progress)
})
我想出如何将它分成 4 个批次。我将它展开一点并在代码中添加注释以提供帮助:
let generator = Observable.generate(initialState: 0, condition: { [=11=] < fileSize }, iterate: { [=11=] + defaultChunkSize })
let chunks = generator.map( { (startChunk) -> (Int64, Int64) in
let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
return (startChunk, endChunk)
})
let requests = chunks.buffer(timeSpan: 0.0, count: 4, scheduler: MainScheduler.instance)// makes batches of four item arrays.
.map { (batch) -> Observable<FileChunk> in
let requests = Observable.from(batch) // spreads the four items back out.
return requests.flatMap( { (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
}) // start the four requests as normal.
}.concat() // wait until the four requests are finished before allowing the next four to begin.
let downloadObservable = requests
.flatMap( { (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
if case .success(let bytesSaved) = saveResult {
progress.completedUnitCount += bytesSaved
}
return Observable.just(progress)
})