RxSwift - Observable.generate - 使用附加映射处理顺序请求

RxSwift - Observable.generate - Handle sequential requests with additional mapping

目标

我正在尝试实现一项使用范围 header 下载文件的服务。这允许我一次下载一个文件块。

实施 - Observable.generate()

为了为每个请求创建一个可观察对象并保存我尝试使用的文件:

let downloadObservable = Observable.generate(initialState: 0, condition: { [=11=] < fileSize }, iterate: { [=11=] + self.defaultChunkSize })

这似乎很管用!除了较大的文件外,它似乎有错误。我的请求被取消了。调试后,我发现我的工作流程并没有按照我的预期运行。这是附加到上面一行的其余工作流程。

.map( { (startChunk) -> (Int64, Int64) in
    // I determine the end chunk so I can download any size file in chunks of size X
    let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )

    return (startChunk, endChunk)
}).flatMap( { [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
    // I make the request via alamofire - UNEXPECTED FLOW HERE SEE NOTE #1
    return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)

}).flatMap( { [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
    // Upon receiving chunk response save to file
    return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)

}).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
    // Update progress if successful
    switch (saveResult) {
    case .success(let bytesSaved):
        progress.completedUnitCount += bytesSaved
    case .failure:
        break
    }

    return Observable.just(progress)
})

注#1

当我 运行 并调试它时,我的第一个 flatMap 循环直到发出 ALL 块请求。我期望这会更加顺序,我们将生成一个可观察对象,然后通过 flatMap 进行所有转换,然后循环回到开头。

这不是我应该实施的方式吗?

我需要在 Observable.generate() 上用 merge() 施展魔法吗?

我想我已经找到了这个问题的答案。关键是 map 通过网络请求,然后 concat 它们。这样做而不是使用 flatMapconcat 运算符将等到请求发送 onCompleted 后再开始下一个请求。代码如下:

let downloadObservable = Observable.generate(initialState: 0, condition: { [=10=] < fileSize }, iterate: { [=10=] + self.defaultChunkSize })
    .map( { (startChunk) -> (Int64, Int64) in
        let endChunk = (startChunk + self.defaultChunkSize > fileSize ? fileSize : startChunk + self.defaultChunkSize )
        return (startChunk, endChunk)
    }).map( { [unowned self] (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
        return self.makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
    }).concat()
    .flatMap( { [unowned self] (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
        return self.saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
    }).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
        if case .success(let bytesSaved) = saveResult {
            progress.completedUnitCount += bytesSaved
        }
        return Observable.just(progress)
    })

我想出如何将它分成 4 个批次。我将它展开一点并在代码中添加注释以提供帮助:

let generator = Observable.generate(initialState: 0, condition: { [=11=] < fileSize }, iterate: { [=11=] + defaultChunkSize })
let chunks  = generator.map( { (startChunk) -> (Int64, Int64) in
    let endChunk = (startChunk + defaultChunkSize > fileSize ? fileSize : startChunk + defaultChunkSize )
    return (startChunk, endChunk)
})
let requests = chunks.buffer(timeSpan: 0.0, count: 4, scheduler: MainScheduler.instance)// makes batches of four item arrays.
    .map { (batch) -> Observable<FileChunk> in
        let requests = Observable.from(batch) // spreads the four items back out.
        return requests.flatMap( { (startChunk: Int64, endChunk: Int64) -> Observable<FileChunk> in
            return makeChunkRequest(url: downloadUrl, startChunk: startChunk, endChunk: endChunk)
        }) // start the four requests as normal.
    }.concat() // wait until the four requests are finished before allowing the next four to begin.

let downloadObservable = requests
    .flatMap( { (fileChunk: FileChunk) -> Observable<FileSaveChunkResult> in
        return saveChunkToFile(fileChunk: fileChunk, location: localDestinationUrl)
    }).flatMap( { (saveResult: FileSaveChunkResult) -> Observable<Progress> in
        if case .success(let bytesSaved) = saveResult {
            progress.completedUnitCount += bytesSaved
        }
        return Observable.just(progress)
    })