Swift 合并:使用 zip 运算符出现意外的背压行为
Swift Combine: Unexpected backpressure behaviour with zip operator
我有一个关于 Combine 中的 zip
运算符与背压结合的问题。
采用以下代码片段:
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
.print()
.sink { letters, digits in
print(letters, digits)
}
subject.send("a")
在 playground 中执行此操作时,输出如下:
receive subscription: (0..<9223372036854775807)
receive subscription: (Zip)
request unlimited
request unlimited
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: (5)
receive value: (6)
receive value: (7)
...
在 iOS 设备上执行时,由于内存问题,代码在几秒后崩溃。
在上面的第四行中可以看出根本原因,其中 zip
从 sequencePublisher
请求无限量的值。由于 sequencePublisher
提供整个范围的 Int
值,这会导致内存溢出。
我想知道的:
zip
在组合它们并将它们推送到 之前等待每个发布者的一个值
- 背压用于控制订阅者对发布者的需求
我的期望是 zip
只从每个发布者请求一个值,等待它们到达,并且只在从每个发布者收到一个值时才请求下一个值。
在这种特殊情况下,我尝试构建一种行为,其中将序列号分配给 subject
生成的每个值。但是,我可以想象,当 zip
组合来自发布频率非常不同的发布者的值时,这总是一个问题。
在 zip
运算符中使用背压似乎是解决该问题的完美工具。你知道为什么不是这样吗?这是错误还是故意的?如果是故意的,为什么?
谢谢大家
看来序列发布者只是不切实际。它似乎对背压没有反应;它只是一次吐出整个序列,这在一个发布应该是异步的世界中毫无意义。如果将 Int.max
更改为 3 则没有问题。 :) 我不知道这是一个错误还是序列发布者整个概念中的一个缺陷。
但是,对于您的实际用例来说确实没有问题,因为有一种更好的方法可以为主题的每个发射分配一个连续的数字,即 scan
。
这是一个更现实的方法:
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController : UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let subject = PassthroughSubject<String, Never>()
subject.scan(("",0)) {t,s in (s,t.1+1)}
.sink { print([=10=].0, [=10=].1)
}.store(in:&storage)
delay(1) {
subject.send("a") // a 1
delay(1) {
subject.send("b") // b 2
}
}
}
}
假设您有其他原因需要每个连续的枚举通过管道向下传递。但是如果你的 only 目标是在每个信号到达 sink
本身时枚举每个信号,你可以让 sink
本身维护一个计数器(它可以很容易做到,因为它是一个闭包):
var storage = Set<AnyCancellable>()
let subject = PassthroughSubject<String, Never>()
override func viewDidLoad() {
super.viewDidLoad()
var counter = 1
subject
.sink {print([=11=], counter); counter += 1}
.store(in:&storage)
delay(1) {
self.subject.send("a") // a 1
self.subject.send("b") // b 2
}
}
Combine 的 zip 运算符:
- 将请求的背压需求从其下游订阅者转发到上游,这对于接收器是无限的
- 缓冲来自第一个上游的整个序列
除了scan-based解决方案,您可以通过控制下游的背压需求或使用自定义 zip 运算符来避免该问题。
我已经成功地开发了一个自定义的 zip 运算符,它在两个方面与原来的运算符不同:
- 无论它从下游接收到什么背压需求,它总是只向其上游发送一个值的需求,然后等待每个上游的响应,然后向下游发出结果,结束这一轮。如此重复,直到需求用尽。
- 它通过使用所描述的基于“轮”的方法避免缓冲整个上游序列。
此处包含的代码有点多,但请随时在此 repo 中查看 https://github.com/SergeBouts/XCombine
我有一个关于 Combine 中的 zip
运算符与背压结合的问题。
采用以下代码片段:
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
.print()
.sink { letters, digits in
print(letters, digits)
}
subject.send("a")
在 playground 中执行此操作时,输出如下:
receive subscription: (0..<9223372036854775807)
receive subscription: (Zip)
request unlimited
request unlimited
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: (5)
receive value: (6)
receive value: (7)
...
在 iOS 设备上执行时,由于内存问题,代码在几秒后崩溃。
在上面的第四行中可以看出根本原因,其中 zip
从 sequencePublisher
请求无限量的值。由于 sequencePublisher
提供整个范围的 Int
值,这会导致内存溢出。
我想知道的:
zip
在组合它们并将它们推送到 之前等待每个发布者的一个值
- 背压用于控制订阅者对发布者的需求
我的期望是 zip
只从每个发布者请求一个值,等待它们到达,并且只在从每个发布者收到一个值时才请求下一个值。
在这种特殊情况下,我尝试构建一种行为,其中将序列号分配给 subject
生成的每个值。但是,我可以想象,当 zip
组合来自发布频率非常不同的发布者的值时,这总是一个问题。
在 zip
运算符中使用背压似乎是解决该问题的完美工具。你知道为什么不是这样吗?这是错误还是故意的?如果是故意的,为什么?
谢谢大家
看来序列发布者只是不切实际。它似乎对背压没有反应;它只是一次吐出整个序列,这在一个发布应该是异步的世界中毫无意义。如果将 Int.max
更改为 3 则没有问题。 :) 我不知道这是一个错误还是序列发布者整个概念中的一个缺陷。
但是,对于您的实际用例来说确实没有问题,因为有一种更好的方法可以为主题的每个发射分配一个连续的数字,即 scan
。
这是一个更现实的方法:
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController : UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let subject = PassthroughSubject<String, Never>()
subject.scan(("",0)) {t,s in (s,t.1+1)}
.sink { print([=10=].0, [=10=].1)
}.store(in:&storage)
delay(1) {
subject.send("a") // a 1
delay(1) {
subject.send("b") // b 2
}
}
}
}
假设您有其他原因需要每个连续的枚举通过管道向下传递。但是如果你的 only 目标是在每个信号到达 sink
本身时枚举每个信号,你可以让 sink
本身维护一个计数器(它可以很容易做到,因为它是一个闭包):
var storage = Set<AnyCancellable>()
let subject = PassthroughSubject<String, Never>()
override func viewDidLoad() {
super.viewDidLoad()
var counter = 1
subject
.sink {print([=11=], counter); counter += 1}
.store(in:&storage)
delay(1) {
self.subject.send("a") // a 1
self.subject.send("b") // b 2
}
}
Combine 的 zip 运算符:
- 将请求的背压需求从其下游订阅者转发到上游,这对于接收器是无限的
- 缓冲来自第一个上游的整个序列
除了scan-based解决方案,您可以通过控制下游的背压需求或使用自定义 zip 运算符来避免该问题。
我已经成功地开发了一个自定义的 zip 运算符,它在两个方面与原来的运算符不同:
- 无论它从下游接收到什么背压需求,它总是只向其上游发送一个值的需求,然后等待每个上游的响应,然后向下游发出结果,结束这一轮。如此重复,直到需求用尽。
- 它通过使用所描述的基于“轮”的方法避免缓冲整个上游序列。
此处包含的代码有点多,但请随时在此 repo 中查看 https://github.com/SergeBouts/XCombine