concatMap / flatMap 应该 运行 在同一个调度器上立即
concatMap / flatMap should run immediately, on the same scheduler
给定一个服务对象,我想确保对服务的每个函数调用都不会产生副作用。在我的例子中,无论函数 A 在做什么,除非调度程序可用,否则函数 B 中什么都不会执行。
这是它的样子:
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.just(input)
.do(onNext: { (str) in
print ("Job A: \(str)")
})
.concatMap { input -> Observable<String> in
return Observable.just("Job AA: \(input)")
.delay(2, scheduler: self.scheduler)
.do(onNext: { (str) in
print (str)
})
}
.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.just(input)
.do(onNext: { (str) in
print ("Job B: \(str)")
})
.delay(1, scheduler: scheduler)
.concatMap { input -> Observable<String> in
return Observable.just("Job BB: \(input)")
.do(onNext: { (str) in
print (str)
})
}
.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["1","2","3"])
.concatMap { service.handleJobA(input: [=11=]) }
.subscribe(onNext:{
print([=11=] + " √")
})
_ = Observable.from(["1","2","3"])
.concatMap { service.handleJobB(input: [=11=]) }
.subscribe(onNext:{
print([=11=] + " √")
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
此时输出为:
Job A: 1
Job B: 1
Job BB: 1
Job BB: 1 √
Job B: 2
Job AA: 1
Job AA: 1 √
Job A: 2
Job BB: 2
Job BB: 2 √
Job B: 3
Job BB: 3
Job BB: 3 √
Job AA: 2
Job AA: 2 √
Job A: 3
Job AA: 3
Job AA: 3 √
然而,这说明了根本问题。内部延迟(这可能发生在任何事情上,真的......网络,处理)导致可观察到的处理超出 "order".
我要的是这个:
Job A: 1
Job AA: 1
Job AA: 1 √
Job B: 1
Job BB: 1
Job BB: 1 √
Job B: 2
Job BB: 2
Job BB: 2 √
Job B: 3
Job BB: 3
Job BB: 3 √
Job A: 2
Job AA: 2
Job AA: 2 √
Job A: 3
Job AA: 3
Job AA: 3 √
这意味着,一旦某个函数开始处理任务,除非完成,否则其他任何人都无法访问。
我之前收到了很好的。它并不完全适用,因为 flatMap/concatMap (?) 两者似乎都不喜欢调度程序。
我的理论是 concatMap 调用确实做了正确的工作,但是随后将遗漏的子序列安排到调度程序队列的末尾,而我希望它在前面,接下来要处理。
我无法解释调度程序的行为...但我可以提出一个小建议
...once a function has started processing a task, no one else get's
access unless it is done...
您可以通过 concatMap
传递所有 handleJob
调用以获得您需要的行为:
Observable
.from([1,2,3,4,5,6])
.flatMap({ (value) -> Observable<String> in
switch value % 2 == 0 {
case true:
return service.handleJobA(input: "\(value)")
case false:
return service.handleJobB(input: "\(value)")
}
})
.subscribe(onNext:{
print([=10=] + " √")
})
服务class示例:
private class Service {
private lazy var result = PublishSubject<(index: Int, result: String)>()
private lazy var publish = PublishSubject<(index: Int, input: String, transformation: (String) -> String)>()
private lazy var index: Int = 0
private lazy var disposeBag = DisposeBag()
init() {
publish
.asObservable()
.concatMap({ (index, input, transformation) -> Observable<(index: Int, result: String)> in
let dueTime = RxTimeInterval(arc4random_uniform(3) + 1)
return Observable
.just((index: index, result: transformation(input)))
.delay(dueTime, scheduler: self.scheduler)
})
.bind(to: result)
.disposed(by: disposeBag)
}
func handleJobA(input: String) -> Observable<String> {
let transformation: (String) -> String = { string in
return "Job A: \(string)"
}
return handleJob(input: input, transformation: transformation)
}
func handleJobB(input: String) -> Observable<String> {
let transformation: (String) -> String = { string in
return "Job B: \(string)"
}
return handleJob(input: input, transformation: transformation)
}
func handleJob(input: String, transformation: @escaping (String) -> String) -> Observable<String> {
index += 1
defer {
publish.onNext((index, input, transformation))
}
return result
.filter({ [expected = index] (index, result) -> Bool in
return expected == index
})
.map({ [=11=].result })
.take(1)
.shareReplayLatestWhileConnected()
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
给定一个服务对象,我想确保对服务的每个函数调用都不会产生副作用。在我的例子中,无论函数 A 在做什么,除非调度程序可用,否则函数 B 中什么都不会执行。
这是它的样子:
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.just(input)
.do(onNext: { (str) in
print ("Job A: \(str)")
})
.concatMap { input -> Observable<String> in
return Observable.just("Job AA: \(input)")
.delay(2, scheduler: self.scheduler)
.do(onNext: { (str) in
print (str)
})
}
.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.just(input)
.do(onNext: { (str) in
print ("Job B: \(str)")
})
.delay(1, scheduler: scheduler)
.concatMap { input -> Observable<String> in
return Observable.just("Job BB: \(input)")
.do(onNext: { (str) in
print (str)
})
}
.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["1","2","3"])
.concatMap { service.handleJobA(input: [=11=]) }
.subscribe(onNext:{
print([=11=] + " √")
})
_ = Observable.from(["1","2","3"])
.concatMap { service.handleJobB(input: [=11=]) }
.subscribe(onNext:{
print([=11=] + " √")
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
此时输出为:
Job A: 1
Job B: 1
Job BB: 1
Job BB: 1 √
Job B: 2
Job AA: 1
Job AA: 1 √
Job A: 2
Job BB: 2
Job BB: 2 √
Job B: 3
Job BB: 3
Job BB: 3 √
Job AA: 2
Job AA: 2 √
Job A: 3
Job AA: 3
Job AA: 3 √
然而,这说明了根本问题。内部延迟(这可能发生在任何事情上,真的......网络,处理)导致可观察到的处理超出 "order".
我要的是这个:
Job A: 1
Job AA: 1
Job AA: 1 √
Job B: 1
Job BB: 1
Job BB: 1 √
Job B: 2
Job BB: 2
Job BB: 2 √
Job B: 3
Job BB: 3
Job BB: 3 √
Job A: 2
Job AA: 2
Job AA: 2 √
Job A: 3
Job AA: 3
Job AA: 3 √
这意味着,一旦某个函数开始处理任务,除非完成,否则其他任何人都无法访问。
我之前收到了很好的
我的理论是 concatMap 调用确实做了正确的工作,但是随后将遗漏的子序列安排到调度程序队列的末尾,而我希望它在前面,接下来要处理。
我无法解释调度程序的行为...但我可以提出一个小建议
...once a function has started processing a task, no one else get's access unless it is done...
您可以通过 concatMap
传递所有 handleJob
调用以获得您需要的行为:
Observable
.from([1,2,3,4,5,6])
.flatMap({ (value) -> Observable<String> in
switch value % 2 == 0 {
case true:
return service.handleJobA(input: "\(value)")
case false:
return service.handleJobB(input: "\(value)")
}
})
.subscribe(onNext:{
print([=10=] + " √")
})
服务class示例:
private class Service {
private lazy var result = PublishSubject<(index: Int, result: String)>()
private lazy var publish = PublishSubject<(index: Int, input: String, transformation: (String) -> String)>()
private lazy var index: Int = 0
private lazy var disposeBag = DisposeBag()
init() {
publish
.asObservable()
.concatMap({ (index, input, transformation) -> Observable<(index: Int, result: String)> in
let dueTime = RxTimeInterval(arc4random_uniform(3) + 1)
return Observable
.just((index: index, result: transformation(input)))
.delay(dueTime, scheduler: self.scheduler)
})
.bind(to: result)
.disposed(by: disposeBag)
}
func handleJobA(input: String) -> Observable<String> {
let transformation: (String) -> String = { string in
return "Job A: \(string)"
}
return handleJob(input: input, transformation: transformation)
}
func handleJobB(input: String) -> Observable<String> {
let transformation: (String) -> String = { string in
return "Job B: \(string)"
}
return handleJob(input: input, transformation: transformation)
}
func handleJob(input: String, transformation: @escaping (String) -> String) -> Observable<String> {
index += 1
defer {
publish.onNext((index, input, transformation))
}
return result
.filter({ [expected = index] (index, result) -> Bool in
return expected == index
})
.map({ [=11=].result })
.take(1)
.shareReplayLatestWhileConnected()
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}