Swift 合并:使用 zip 运算符出现意外的背压行为

Swift Combine: Unexpected backpressure behaviour with zip operator

我有一个关于 Combine 中的 zip 运算符与背压结合的问题。

采用以下代码片段:

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

subject.send("a")

在 playground 中执行此操作时,输出如下:

receive subscription: (0..<9223372036854775807)
receive subscription: (Zip)
request unlimited
request unlimited
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: (5)
receive value: (6)
receive value: (7)
...

在 iOS 设备上执行时,由于内存问题,代码在几秒后崩溃。

在上面的第四行中可以看出根本原因,其中 zipsequencePublisher 请求无限量的值。由于 sequencePublisher 提供整个范围的 Int 值,这会导致内存溢出。

我想知道的:

我的期望是 zip 只从每个发布者请求一个值,等待它们到达,并且只在从每个发布者收到一个值时才请求下一个值。

在这种特殊情况下,我尝试构建一种行为,其中将序列号分配给 subject 生成的每个值。但是,我可以想象,当 zip 组合来自发布频率非常不同的发布者的值时,这总是一个问题。

zip 运算符中使用背压似乎是解决该问题的完美工具。你知道为什么不是这样吗?这是错误还是故意的?如果是故意的,为什么?

谢谢大家

看来序列发布者只是不切实际。它似乎对背压没有反应;它只是一次吐出整个序列,这在一个发布应该是异步的世界中毫无意义。如果将 Int.max 更改为 3 则没有问题。 :) 我不知道这是一个错误还是序列发布者整个概念中的一个缺陷。

但是,对于您的实际用例来说确实没有问题,因为有一种更好的方法可以为主题的每个发射分配一个连续的数字,即 scan

这是一个更现实的方法:

func delay(_ delay:Double, closure:@escaping ()->()) {
    let when = DispatchTime.now() + delay
    DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController : UIViewController {
    var storage = Set<AnyCancellable>()
    override func viewDidLoad() {
        super.viewDidLoad()
        let subject = PassthroughSubject<String, Never>()
        subject.scan(("",0)) {t,s in (s,t.1+1)}
            .sink { print([=10=].0, [=10=].1)
            }.store(in:&storage)
        delay(1) {
            subject.send("a") // a 1
            delay(1) {
                subject.send("b") // b 2
            }
        }
    }
}

假设您有其他原因需要每个连续的枚举通过管道向下传递。但是如果你的 only 目标是在每个信号到达 sink 本身时枚举每个信号,你可以让 sink 本身维护一个计数器(它可以很容易做到,因为它是一个闭包):

    var storage = Set<AnyCancellable>()
    let subject = PassthroughSubject<String, Never>()
    override func viewDidLoad() {
        super.viewDidLoad()
        var counter = 1
        subject
            .sink {print([=11=], counter); counter += 1}
            .store(in:&storage)
        delay(1) {
            self.subject.send("a") // a 1
            self.subject.send("b") // b 2
        }
    }

Combine 的 zip 运算符:

  1. 将请求的背压需求从其下游订阅者转发到上游,这对于接收器是无限的
  2. 缓冲来自第一个上游的整个序列

除了scan-based解决方案,您可以通过控制下游的背压需求或使用自定义 zip 运算符来避免该问题。

我已经成功地开发了一个自定义的 zip 运算符,它在两个方面与原来的运算符不同:

  1. 无论它从下游接收到什么背压需求,它总是只向其上游发送一个值的需求,然后等待每个上游的响应,然后向下游发出结果,结束这一轮。如此重复,直到需求用尽。
  2. 它通过使用所描述的基于“轮”的方法避免缓冲整个上游序列。

此处包含的代码有点多,但请随时在此 repo 中查看 https://github.com/SergeBouts/XCombine