在看到特定数量的事件后处理下一个事件

Handle next events after seeing a specific number of them

我正在尝试做的是订阅一个发出 Enums 序列的可观察对象。目标是每次我看到 3 个 Enums 的特定类型时,我的 onNext 就会被调用。以下是我尝试过的但是,它只能使用一次。它不会继续下去。我想知道处理这个问题的最佳方法是什么。

enum Baseball {
 case strike, ball, hit
}

let bag = DisposeBag()
let subject = PublishSubject<Baseball>()

subject.filter { [=10=] == .strike }
  .elementAt(2)
  .subscribe(onNext: { _ in print("3 Strikes you're out") 
}).addDisposableTo(bag)

// First batter
subject.onNext(.strike)
subject.onNext(.ball)
subject.onNext(.ball)
subject.onNext(.ball)
subject.onNext(.strike)
subject.onNext(.strike) // 3 Strikes you're out is printed

// Second batter
subject.onNext(.ball)
subject.onNext(.ball)
subject.onNext(.hit)

// Third batter
subject.onNext(.strike)
subject.onNext(.strike)
subject.onNext(.strike) // Would like this to fire as well

使用缓冲运算符:

subject.filter { [=10=] == .strike }
    .buffer(timeSpan: 3e7, count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { print("3 Strikes you're out") })
    .addDisposableTo(bag)

现在每次有 3 次罢工时都会发出,或者大约每年一次。

如果您不喜欢它每年超时的事实,您可以编写自己的只需要计数的缓冲区运算符:

extension Observable {
    func buffer(count: Int) -> Observable<[E]> {
        return Observable<[E]>.create { observer in
            var elements: [E] = []
            let lock = NSRecursiveLock()
            return self.subscribe { event in
                switch event {
                case .completed:
                    observer.onCompleted()
                case .error(let error):
                    observer.onError(error)
                case .next(let element):
                    lock.lock(); defer { lock.unlock() }
                    elements.append(element)
                    if elements.count == count {
                        observer.onNext(elements)
                        elements = []
                    }
                }
            }
        }
    }
}

这是一个有趣的练习。

缓冲区

如@daniel-t 所示,此方法采用容量为 3 的 .buffer,当容量已满时,您知道有 3 次罢工。

计数器

除了buffer方法,你还可以使用计数器:

let subject = PublishSubject<Baseball>()

let strikes = subject.asObservable()
    .filter { [=10=] == .strike }
let ticker = strikes.map { _ in () }
let counter = ticker.scan(0) { (memo, _) -> Int in memo + 1 }

strikes.withLatestFrom(counter) { strike, count in (strike, count) }
    .filter { _, count in count % 3 == 0 }
    .subscribe(onNext: { _ in print("3 Strikes you're out")
    }).addDisposableTo(bag)

与一两个操作员一起,也可以使增加的罢工计数器在 3 次后重置为 0。

作为计数器的索引

let subject = PublishSubject<Baseball>()

let strikes = subject.asObservable()
    .filter { [=11=] == .strike }
// Unlike the counter, this starts at 0 and reports "% 3 == 0" too early.
strikes.mapWithIndex { _, index in index + 1 }
    .filter { [=11=] % 3 == 0 }
    .subscribe(onNext: { _ in print("3 Strikes you're out")
    }).addDisposableTo(bag)