如何实现串行网络调用队列然后在 RxSwift 中处理?
How to implement a queue of serial network calls then processing in RxSwift?
我正在开发一个应用程序,我想使用 RxSwift 和 RxCocoa 实现以下目标
- 下载 JSON 包含 url 到 X 个文件
- 下载文件1,处理文件1
- 下载文件2,处理文件2
- 下载文件3,处理文件3
...等等
这里的关键是每个文件的处理必须在下载下一个文件之前完成。至少文件处理的顺序必须按顺序执行。如果我可以在处理文件 1 时开始下载文件 2,那就太棒了,但这不是必需的。
我试过使用 SerialDispatchQueueScheduler 来完成这项工作,但由于文件大小不同,每个文件的下载完成时间不同,因此处理代码的触发顺序与我开始时的顺序不同下载。
通过使用 NSOperations 等,我可以在不使用 Rx 的情况下轻松实现这一点,但我想在此应用程序中继续使用 Rx,因为这是我在此应用程序的其他地方使用的。
下面我包含了一些代码片段。为了这个问题添加了评论。
.flatMap { [unowned self] (tasks: [DiffTask]) -> Observable<ApplyDiffStatus> in
return Observable.from(tasks)
.observeOn(self.backgroundScheduler) // Whosebug: backgroundScheduler is a SerialDispatchQueueScheduler
.flatMapWithIndex({ [unowned self] (task, index) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // Whosebug: Downloads a file from a URL
})
.catchError({ (error) -> Observable<DictionaryUpdater.DiffTaskProgress> in
observable.onError(error)
throw error
})
.map({ (diffTask : DiffTaskProgress) -> DiffTaskProgress.Progress in
// Stack Overflow: I've wrapped much of the progress observable in a Observable<UpdateProgress>
switch diffTask.progress {
case .started(currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
case .finished(data: _, currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
case .progress(completion: _, currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
}
return diffTask.progress
})
.flatMap({ [unowned self] (progress: DiffTaskProgress.Progress) -> Observable<ApplyDiffStatus> in
switch progress {
case .finished(data: let data, currentTask: let currentTask, taskCount: let taskCount):
return self.applyDiff(data, currentTask: currentTask, taskCount: taskCount) // Whosebug: PROCESSES THE FILE THAT WAS DOWNLOADED
default:
return Observable.empty()
}
})
}
我设法使用 concatMap
运算符解决了这个问题。所以而不是
.flatMapWithIndex({ [unowned self] (task, index) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // Whosebug: Downloads a file from a URL
})
我做了这样的事情:
tasks.enumerated().concatMap { (index, task) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count)
}
concatMap
运算符确保第一个可观察对象在发出更多信号之前完成。我不得不使用 enumerated()
,因为 concatMap 没有附带 concatMapWithIndex
,但它有效:)
我正在开发一个应用程序,我想使用 RxSwift 和 RxCocoa 实现以下目标
- 下载 JSON 包含 url 到 X 个文件
- 下载文件1,处理文件1
- 下载文件2,处理文件2
- 下载文件3,处理文件3
...等等
这里的关键是每个文件的处理必须在下载下一个文件之前完成。至少文件处理的顺序必须按顺序执行。如果我可以在处理文件 1 时开始下载文件 2,那就太棒了,但这不是必需的。
我试过使用 SerialDispatchQueueScheduler 来完成这项工作,但由于文件大小不同,每个文件的下载完成时间不同,因此处理代码的触发顺序与我开始时的顺序不同下载。
通过使用 NSOperations 等,我可以在不使用 Rx 的情况下轻松实现这一点,但我想在此应用程序中继续使用 Rx,因为这是我在此应用程序的其他地方使用的。
下面我包含了一些代码片段。为了这个问题添加了评论。
.flatMap { [unowned self] (tasks: [DiffTask]) -> Observable<ApplyDiffStatus> in
return Observable.from(tasks)
.observeOn(self.backgroundScheduler) // Whosebug: backgroundScheduler is a SerialDispatchQueueScheduler
.flatMapWithIndex({ [unowned self] (task, index) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // Whosebug: Downloads a file from a URL
})
.catchError({ (error) -> Observable<DictionaryUpdater.DiffTaskProgress> in
observable.onError(error)
throw error
})
.map({ (diffTask : DiffTaskProgress) -> DiffTaskProgress.Progress in
// Stack Overflow: I've wrapped much of the progress observable in a Observable<UpdateProgress>
switch diffTask.progress {
case .started(currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
case .finished(data: _, currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
case .progress(completion: _, currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
}
return diffTask.progress
})
.flatMap({ [unowned self] (progress: DiffTaskProgress.Progress) -> Observable<ApplyDiffStatus> in
switch progress {
case .finished(data: let data, currentTask: let currentTask, taskCount: let taskCount):
return self.applyDiff(data, currentTask: currentTask, taskCount: taskCount) // Whosebug: PROCESSES THE FILE THAT WAS DOWNLOADED
default:
return Observable.empty()
}
})
}
我设法使用 concatMap
运算符解决了这个问题。所以而不是
.flatMapWithIndex({ [unowned self] (task, index) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // Whosebug: Downloads a file from a URL
})
我做了这样的事情:
tasks.enumerated().concatMap { (index, task) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count)
}
concatMap
运算符确保第一个可观察对象在发出更多信号之前完成。我不得不使用 enumerated()
,因为 concatMap 没有附带 concatMapWithIndex
,但它有效:)