Swift 合并:缓冲上游值并以稳定的速率发出它们?
Swift Combine: Buffer upstream values and emit them at a steady rate?
在 iOS 13.
中使用新的 Combine 框架
假设我有一个上游发布者以非常不规则的速率发送值 - 有时几秒钟或几分钟可能没有任何值,然后可能会立即出现一连串的值。我想创建一个自定义发布者,它订阅上游值,缓冲它们并在它们进入时以已知的常规节奏发出它们,但如果它们都用完了则不发布任何内容。
具体例子:
- t = 0 到 5000 毫秒:未发布上游值
- t = 5001ms:上游发布"a"
- t = 5002ms:上游发布"b"
- t = 5003ms:上游发布"c"
- t = 5004ms 到 10000ms:没有发布上游值
- t = 10001ms:上游发布"d"
我订阅上游的发布者每 1 秒产生一次值:
- t = 0 到 5000 毫秒:未发布任何值
- t = 5001ms:发布 "a"
- t = 6001ms:发布 "b"
- t = 7001ms:发布 "c"
- t = 7001ms 到 10001ms:未发布值
- t = 10001ms:发布 "d"
Combine 中的 None 现有发布者或运营商似乎 相当 在这里做我想做的事。
throttle
and debounce
将简单地以特定节奏对上游值进行采样并丢弃缺失的值(例如,如果节奏为 1000 毫秒,则只会发布 "a")
delay
会为每个值添加相同的延迟,但不会 space 它们出来(例如,如果我的延迟是 1000 毫秒,它会在 6001 毫秒发布 "a","b" 在 6002 毫秒,"c" 在 6003 毫秒)
buffer
似乎很有前途,但我不太清楚如何使用它——如何强制它按需从缓冲区发布一个值。当我将接收器连接到 buffer
时,它似乎只是立即发布所有值,根本没有缓冲。
我考虑过使用某种组合运算符,如 zip
或 merge
或 combineLatest
并将其与计时器发布器组合,这可能是正确的方法,但我可以弄清楚如何配置它以提供我想要的行为。
编辑
这是一张大理石图,希望能说明我的意图:
Upstream Publisher:
-A-B-C-------------------D-E-F--------|>
My Custom Operator:
-A----B----C-------------D----E----F--|>
编辑 2:单元测试
这是一个单元测试,如果 modulatedPublisher
(我想要的缓冲发布者)按预期工作,它应该会通过。它并不完美,但它会在收到事件(包括收到的时间)时存储它们,然后比较事件之间的时间间隔,确保它们不小于所需的间隔。
func testCustomPublisher() {
let expectation = XCTestExpectation(description: "async")
var events = [Event]()
let passthroughSubject = PassthroughSubject<Int, Never>()
let cancellable = passthroughSubject
.modulatedPublisher(interval: 1.0)
.sink { value in
events.append(Event(value: value, date: Date()))
print("value received: \(value) at \(self.dateFormatter.string(from:Date()))")
}
// WHEN I send 3 events, wait 6 seconds, and send 3 more events
passthroughSubject.send(1)
passthroughSubject.send(2)
passthroughSubject.send(3)
DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(6000)) {
passthroughSubject.send(4)
passthroughSubject.send(5)
passthroughSubject.send(6)
DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(4000)) {
// THEN I expect the stored events to be no closer together in time than the interval of 1.0s
for i in 1 ..< events.count {
let interval = events[i].date.timeIntervalSince(events[i-1].date)
print("Interval: \(interval)")
// There's some small error in the interval but it should be about 1 second since I'm using a 1s modulated publisher.
XCTAssertTrue(interval > 0.99)
}
expectation.fulfill()
}
}
wait(for: [expectation], timeout: 15)
}
我最接近的是使用 zip
,像这样:
public extension Publisher where Self.Failure == Never {
func modulatedPublisher(interval: TimeInterval) -> AnyPublisher<Output, Never> {
let timerBuffer = Timer
.publish(every: interval, on: .main, in: .common)
.autoconnect()
return timerBuffer
.zip(self, { }) // should emit one input element () every timer tick
.eraseToAnyPublisher()
}
}
这会正确调整前三个事件(1、2 和 3),但不会调整后三个事件(4、5 和 6)。输出:
value received: 1 at 3:54:07.0007
value received: 2 at 3:54:08.0008
value received: 3 at 3:54:09.0009
value received: 4 at 3:54:12.0012
value received: 5 at 3:54:12.0012
value received: 6 at 3:54:12.0012
我相信这是因为 zip
有一些内部缓冲能力。前三个上游事件被缓冲并按照定时器的节奏发出,但是在 6 秒等待期间,定时器的事件被缓冲 - 当第二个设置上游事件被触发时,已经有定时器事件在队列中等待,所以它们配对并立即发射。
这是一个有趣的问题。我玩过 Timer.publish
、buffer
、zip
和 throttle
的各种组合,但我无法让任何组合按照您想要的方式工作。因此,让我们编写一个自定义订阅者。
我们真正想要的是 API 当我们从上游获得输入时,我们还能够控制上游何时交付下一个输入。像这样:
extension Publisher {
/// Subscribe to me with a stepping function.
/// - parameter stepper: A function I'll call with each of my inputs, and with my completion.
/// Each time I call this function with an input, I also give it a promise function.
/// I won't deliver the next input until the promise is called with a `.more` argument.
/// - returns: An object you can use to cancel the subscription asynchronously.
func step(with stepper: @escaping (StepEvent<Output, Failure>) -> ()) -> AnyCancellable {
???
}
}
enum StepEvent<Input, Failure: Error> {
/// Handle the Input. Call `StepPromise` when you're ready for the next Input,
/// or to cancel the subscription.
case input(Input, StepPromise)
/// Upstream completed the subscription.
case completion(Subscribers.Completion<Failure>)
}
/// The type of callback given to the stepper function to allow it to continue
/// or cancel the stream.
typealias StepPromise = (StepPromiseRequest) -> ()
enum StepPromiseRequest {
// Pass this to the promise to request the next item from upstream.
case more
// Pass this to the promise to cancel the subscription.
case cancel
}
有了这个 step
API,我们可以写一个 pace
运算符来做你想做的事:
extension Publisher {
func pace<Context: Scheduler, MySubject: Subject>(
_ pace: Context.SchedulerTimeType.Stride, scheduler: Context, subject: MySubject)
-> AnyCancellable
where MySubject.Output == Output, MySubject.Failure == Failure
{
return step {
switch [=11=] {
case .input(let input, let promise):
// Send the input from upstream now.
subject.send(input)
// Wait for the pace interval to elapse before requesting the
// next input from upstream.
scheduler.schedule(after: scheduler.now.advanced(by: pace)) {
promise(.more)
}
case .completion(let completion):
subject.send(completion: completion)
}
}
}
}
此 pace
运算符采用 pace
(输出之间所需的间隔)、一个用于安排事件的调度程序以及一个用于重新发布上游输入的 subject
。它通过 subject
发送每个输入来处理每个输入,然后在从上游请求下一个输入之前使用调度程序等待步速间隔。
现在我们只需要实现 step
运算符。 Combine 在这里没有给我们太多帮助。它确实有一个称为“背压”的特性,这意味着发布者不能向下游发送输入,直到下游通过向上游发送 Subscribers.Demand
来请求它。通常您会看到下游向上游发送 .unlimited
需求,但我们不会这样做。相反,我们将利用背压。在步进器完成承诺之前,我们不会向上游发送任何需求,然后我们只会发送 .max(1)
的需求,因此我们使上游与步进器同步运行。 (我们还要发送一个.max(1)
的初始需求来启动整个过程。)
好的,所以需要实现一个带有步进函数并符合Subscriber
的类型。最好查看 Reactive Streams JVM Specification,因为 Combine 是基于该规范的。
实现困难的原因是有几件事可以异步调用我们的订阅者:
- 上游可以从任何线程调用订阅者(但需要序列化其调用)。
- 在我们为步进器提供承诺函数后,步进器可以在任何线程上调用这些承诺。
- 我们希望订阅可以取消,并且取消可以发生在任何线程上。
- 所有这些异步性意味着我们必须用锁来保护我们的内部状态。
- 我们必须小心,不要在持有那个锁的同时调用,以避免死锁。
我们还将通过为每个承诺提供唯一 ID 来保护订阅者免受涉及重复调用承诺或调用过时承诺的恶作剧。
这是我们的基本订户定义:
import Combine
import Foundation
public class SteppingSubscriber<Input, Failure: Error> {
public init(stepper: @escaping Stepper) {
l_state = .subscribing(stepper)
}
public typealias Stepper = (Event) -> ()
public enum Event {
case input(Input, Promise)
case completion(Completion)
}
public typealias Promise = (Request) -> ()
public enum Request {
case more
case cancel
}
public typealias Completion = Subscribers.Completion<Failure>
private let lock = NSLock()
// The l_ prefix means it must only be accessed while holding the lock.
private var l_state: State
private var l_nextPromiseId: PromiseId = 1
private typealias PromiseId = Int
private var noPromiseId: PromiseId { 0 }
}
请注意,我将之前的辅助类型(StepEvent
、StepPromise
和 StepPromiseRequest
)移到了 SteppingSubscriber
中并缩短了它们的名称。
现在让我们考虑 l_state
的神秘类型 State
。我们的订阅者可能处于哪些不同状态?
- 我们可能正在等待从上游接收
Subscription
对象。
- 我们可以从上游接收到
Subscription
并等待信号(来自上游的输入或完成,或者来自步进器的承诺的完成)。
- 我们可能正在调用步进器,我们要小心以防它在我们调用它时完成承诺。
- 我们可能已被取消或已从上游收到完成。
所以这是我们对 State
的定义:
extension SteppingSubscriber {
private enum State {
// Completed or cancelled.
case dead
// Waiting for Subscription from upstream.
case subscribing(Stepper)
// Waiting for a signal from upstream or for the latest promise to be completed.
case subscribed(Subscribed)
// Calling out to the stopper.
case stepping(Stepping)
var subscription: Subscription? {
switch self {
case .dead: return nil
case .subscribing(_): return nil
case .subscribed(let subscribed): return subscribed.subscription
case .stepping(let stepping): return stepping.subscribed.subscription
}
}
struct Subscribed {
var stepper: Stepper
var subscription: Subscription
var validPromiseId: PromiseId
}
struct Stepping {
var subscribed: Subscribed
// If the stepper completes the current promise synchronously with .more,
// I set this to true.
var shouldRequestMore: Bool
}
}
}
由于我们使用 NSLock
(为简单起见),让我们定义一个扩展以确保我们始终将锁定与解锁相匹配:
fileprivate extension NSLock {
@inline(__always)
func sync<Answer>(_ body: () -> Answer) -> Answer {
lock()
defer { unlock() }
return body()
}
}
现在我们已准备好处理一些事件。最容易处理的事件是异步取消,这是 Cancellable
协议的唯一要求。如果我们处于 .dead
以外的任何状态,我们希望成为 .dead
并且,如果有上游订阅,则取消它。
extension SteppingSubscriber: Cancellable {
public func cancel() {
let sub: Subscription? = lock.sync {
defer { l_state = .dead }
return l_state.subscription
}
sub?.cancel()
}
}
注意这里我不想在 lock
被锁定时调用上游订阅的 cancel
函数,因为 lock
不是递归锁而且我不'想冒险陷入僵局。 lock.sync
的所有使用都遵循将任何调用推迟到锁定解锁之后的模式。
现在让我们实现 Subscriber
协议要求。首先,让我们处理从上游接收 Subscription
的问题。唯一应该发生的情况是当我们处于 .subscribing
状态时,但 .dead
也是可能的,在这种情况下我们只想取消上游订阅。
extension SteppingSubscriber: Subscriber {
public func receive(subscription: Subscription) {
let action: () -> () = lock.sync {
guard case .subscribing(let stepper) = l_state else {
return { subscription.cancel() }
}
l_state = .subscribed(.init(stepper: stepper, subscription: subscription, validPromiseId: noPromiseId))
return { subscription.request(.max(1)) }
}
action()
}
请注意,在使用 lock.sync
时(以及所有后续使用中),我 return 一个“动作”闭包,因此我可以在锁解锁后执行任意调用。
我们要解决的下一个 Subscriber
协议要求是接收完成:
public func receive(completion: Subscribers.Completion<Failure>) {
let action: (() -> ())? = lock.sync {
// The only state in which I have to handle this call is .subscribed:
// - If I'm .dead, either upstream already completed (and shouldn't call this again),
// or I've been cancelled.
// - If I'm .subscribing, upstream must send me a Subscription before sending me a completion.
// - If I'm .stepping, upstream is currently signalling me and isn't allowed to signal
// me again concurrently.
guard case .subscribed(let subscribed) = l_state else {
return nil
}
l_state = .dead
return { [stepper = subscribed.stepper] in
stepper(.completion(completion))
}
}
action?()
}
我们最复杂的 Subscriber
协议要求是接收 Input
:
- 我们必须做出承诺。
- 我们必须将承诺传递给步进器。
- 步进器可以在 returning 之前完成承诺。
- 在 stepper returns 之后,我们必须检查它是否完成了
.more
的承诺,如果是,return 上游的适当需求。
由于我们必须在这项工作的中间调出步进器,我们有一些丑陋的 lock.sync
调用嵌套。
public func receive(_ input: Input) -> Subscribers.Demand {
let action: (() -> Subscribers.Demand)? = lock.sync {
// The only state in which I have to handle this call is .subscribed:
// - If I'm .dead, either upstream completed and shouldn't call this,
// or I've been cancelled.
// - If I'm .subscribing, upstream must send me a Subscription before sending me Input.
// - If I'm .stepping, upstream is currently signalling me and isn't allowed to
// signal me again concurrently.
guard case .subscribed(var subscribed) = l_state else {
return nil
}
let promiseId = l_nextPromiseId
l_nextPromiseId += 1
let promise: Promise = { request in
self.completePromise(id: promiseId, request: request)
}
subscribed.validPromiseId = promiseId
l_state = .stepping(.init(subscribed: subscribed, shouldRequestMore: false))
return { [stepper = subscribed.stepper] in
stepper(.input(input, promise))
let demand: Subscribers.Demand = self.lock.sync {
// The only possible states now are .stepping and .dead.
guard case .stepping(let stepping) = self.l_state else {
return .none
}
self.l_state = .subscribed(stepping.subscribed)
return stepping.shouldRequestMore ? .max(1) : .none
}
return demand
}
}
return action?() ?? .none
}
} // end of extension SteppingSubscriber: Publisher
我们的订户需要处理的最后一件事是完成承诺。由于以下几个原因,这很复杂:
- 我们想要防止一个承诺被多次完成。
- 我们想防止旧的承诺被完成。
- 当承诺完成时,我们可以处于任何状态。
因此:
extension SteppingSubscriber {
private func completePromise(id: PromiseId, request: Request) {
let action: (() -> ())? = lock.sync {
switch l_state {
case .dead, .subscribing(_): return nil
case .subscribed(var subscribed) where subscribed.validPromiseId == id && request == .more:
subscribed.validPromiseId = noPromiseId
l_state = .subscribed(subscribed)
return { [sub = subscribed.subscription] in
sub.request(.max(1))
}
case .subscribed(let subscribed) where subscribed.validPromiseId == id && request == .cancel:
l_state = .dead
return { [sub = subscribed.subscription] in
sub.cancel()
}
case .subscribed(_):
// Multiple completion or stale promise.
return nil
case .stepping(var stepping) where stepping.subscribed.validPromiseId == id && request == .more:
stepping.subscribed.validPromiseId = noPromiseId
stepping.shouldRequestMore = true
l_state = .stepping(stepping)
return nil
case .stepping(let stepping) where stepping.subscribed.validPromiseId == id && request == .cancel:
l_state = .dead
return { [sub = stepping.subscribed.subscription] in
sub.cancel()
}
case .stepping(_):
// Multiple completion or stale promise.
return nil
}
}
action?()
}
}
哇哦!
完成所有这些后,我们可以编写真正的 step
运算符:
extension Publisher {
func step(with stepper: @escaping (SteppingSubscriber<Output, Failure>.Event) -> ()) -> AnyCancellable {
let subscriber = SteppingSubscriber<Output, Failure>(stepper: stepper)
self.subscribe(subscriber)
return .init(subscriber)
}
}
然后我们可以尝试上面的 pace
运算符。由于我们不在 SteppingSubscriber
中进行任何缓冲,并且上游通常不进行缓冲,因此我们将在上游和我们的 pace
运算符之间插入一个 buffer
。
var cans: [AnyCancellable] = []
func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
let erratic = Just("A").delay(for: 0.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher()
.merge(with: Just("B").delay(for: 0.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("C").delay(for: 0.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("D").delay(for: 5.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("E").delay(for: 5.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("F").delay(for: 5.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.handleEvents(
receiveOutput: { print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") },
receiveCompletion: { print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") }
)
.makeConnectable()
let subject = PassthroughSubject<String, Never>()
cans += [erratic
.buffer(size: 1000, prefetch: .byRequest, whenFull: .dropOldest)
.pace(.seconds(1), scheduler: DispatchQueue.main, subject: subject)]
cans += [subject.sink(
receiveCompletion: { print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") },
receiveValue: { print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") }
)]
let c = erratic.connect()
cans += [AnyCancellable { c.cancel() }]
return true
}
最后,这里是输出:
erratic: 223394.17115897 A
paced: 223394.171495405 A
erratic: 223394.408086369 B
erratic: 223394.739186984 C
paced: 223395.171615624 B
paced: 223396.27056174 C
erratic: 223399.536717127 D
paced: 223399.536782847 D
erratic: 223399.536834495 E
erratic: 223400.236808469 F
erratic: 223400.236886323 finished
paced: 223400.620542561 E
paced: 223401.703613078 F
paced: 223402.703828512 finished
- 时间戳以秒为单位。
- 古怪的出版商的时间确实古怪,有时甚至会在时间上接近。
- 节奏计时始终至少相隔一秒,即使不稳定事件发生相隔不到一秒。
- 当一个不稳定事件在前一个事件发生超过一秒后发生时,节奏事件将在不稳定事件之后立即发送,不会进一步延迟。
- 节奏完成发生在最后一个节奏事件之后一秒,即使不稳定完成发生在最后一个不稳定事件之后。
buffer
在发送最后一个事件后收到另一个请求之前不会发送完成,并且该请求被步调计时器延迟。
为了简单 copy/paste.
,我已将 step
运算符的整个实现放在 this gist 中
只是想提一下,我改编了 Rob 之前的回答并将其转换为自定义 Publisher,以便允许单个不间断的管道(请参阅他的解决方案下方的评论)。我的改编在下面,但所有的功劳仍然归于他。它还使用 Rob 的 step
运算符和 SteppingSubscriber
,因为此自定义发布者在内部使用它们。
编辑:作为 modulated
运算符的一部分使用缓冲区更新,否则需要附加缓冲区以缓冲上游事件。
public extension Publisher {
func modulated<Context: Scheduler>(_ pace: Context.SchedulerTimeType.Stride, scheduler: Context) -> AnyPublisher<Output, Failure> {
let upstream = buffer(size: 1000, prefetch: .byRequest, whenFull: .dropNewest).eraseToAnyPublisher()
return PacePublisher<Context, AnyPublisher>(pace: pace, scheduler: scheduler, source: upstream).eraseToAnyPublisher()
}
}
final class PacePublisher<Context: Scheduler, Source: Publisher>: Publisher {
typealias Output = Source.Output
typealias Failure = Source.Failure
let subject: PassthroughSubject<Output, Failure>
let scheduler: Context
let pace: Context.SchedulerTimeType.Stride
lazy var internalSubscriber: SteppingSubscriber<Output, Failure> = SteppingSubscriber<Output, Failure>(stepper: stepper)
lazy var stepper: ((SteppingSubscriber<Output, Failure>.Event) -> ()) = {
switch [=10=] {
case .input(let input, let promise):
// Send the input from upstream now.
self.subject.send(input)
// Wait for the pace interval to elapse before requesting the
// next input from upstream.
self.scheduler.schedule(after: self.scheduler.now.advanced(by: self.pace)) {
promise(.more)
}
case .completion(let completion):
self.subject.send(completion: completion)
}
}
init(pace: Context.SchedulerTimeType.Stride, scheduler: Context, source: Source) {
self.scheduler = scheduler
self.pace = pace
self.subject = PassthroughSubject<Source.Output, Source.Failure>()
source.subscribe(internalSubscriber)
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subject.subscribe(subscriber)
subject.send(subscription: PaceSubscription(subscriber: subscriber))
}
}
public class PaceSubscription<S: Subscriber>: Subscription {
private var subscriber: S?
init(subscriber: S) {
self.subscriber = subscriber
}
public func request(_ demand: Subscribers.Demand) {
}
public func cancel() {
subscriber = nil
}
}
Publishers.CollectByTime
在这里有什么用处吗?
Publishers.CollectByTime(upstream: upstreamPublisher.share(), strategy: Publishers.TimeGroupingStrategy.byTime(RunLoop.main, .seconds(1)), options: nil)
编辑
下面概述的原始方法有一种更简单的方法,它不需要起搏器,而是使用由 flatMap(maxPublishers: .max(1))
创建的背压。
flatMap
发送 1 的需求,直到其返回的发布者(我们可以延迟)完成。我们需要 Buffer
上游发布者来缓冲这些值。
// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let interval = 1.0
let pub = subject
.buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(1)) {
Just([=10=])
.delay(for: .seconds(interval), scheduler: DispatchQueue.main)
}
原版
我知道这是一个老问题,但我认为有更简单的方法来实现它,所以我想分享一下。
这个想法类似于 .zip
和 Timer
,除了 Timer
,你会 .zip
从先前发送的值,可以使用 CurrentValueSubject
实现。需要 CurrentValueSubject
而不是 PassthroughSubject
才能播种第一个“tick”。
// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let pacer = CurrentValueSubject<Void, Never>(())
let interval = 1.0
let pub = subject.zip(pacer)
.flatMap { v in
Just(v.0) // extract the original value
.delay(for: .seconds(interval), scheduler: DispatchQueue.main)
.handleEvents(receiveOutput: { _ in
pacer.send() // send the pacer "tick" after the interval
})
}
发生的事情是 .zip
起搏器上的门,它仅在先前发送的值延迟后到达。
如果下一个值早于允许的时间间隔,它会等待起搏器。
但是,如果下一个值晚出现,那么起搏器已经有一个新值可以立即提供,因此不会有延迟。
如果您像在测试用例中那样使用它:
let c = pub.sink { print("\([=12=]): \(Date())") }
subject.send(Date())
subject.send(Date())
subject.send(Date())
DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) {
subject.send(Date())
subject.send(Date())
}
DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) {
subject.send(Date())
subject.send(Date())
}
结果会是这样的:
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:21 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:22 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:23 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:24 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:25 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:32 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:33 +0000
在 iOS 13.
中使用新的 Combine 框架假设我有一个上游发布者以非常不规则的速率发送值 - 有时几秒钟或几分钟可能没有任何值,然后可能会立即出现一连串的值。我想创建一个自定义发布者,它订阅上游值,缓冲它们并在它们进入时以已知的常规节奏发出它们,但如果它们都用完了则不发布任何内容。
具体例子:
- t = 0 到 5000 毫秒:未发布上游值
- t = 5001ms:上游发布"a"
- t = 5002ms:上游发布"b"
- t = 5003ms:上游发布"c"
- t = 5004ms 到 10000ms:没有发布上游值
- t = 10001ms:上游发布"d"
我订阅上游的发布者每 1 秒产生一次值:
- t = 0 到 5000 毫秒:未发布任何值
- t = 5001ms:发布 "a"
- t = 6001ms:发布 "b"
- t = 7001ms:发布 "c"
- t = 7001ms 到 10001ms:未发布值
- t = 10001ms:发布 "d"
None 现有发布者或运营商似乎 相当 在这里做我想做的事。
throttle
anddebounce
将简单地以特定节奏对上游值进行采样并丢弃缺失的值(例如,如果节奏为 1000 毫秒,则只会发布 "a")delay
会为每个值添加相同的延迟,但不会 space 它们出来(例如,如果我的延迟是 1000 毫秒,它会在 6001 毫秒发布 "a","b" 在 6002 毫秒,"c" 在 6003 毫秒)buffer
似乎很有前途,但我不太清楚如何使用它——如何强制它按需从缓冲区发布一个值。当我将接收器连接到buffer
时,它似乎只是立即发布所有值,根本没有缓冲。
我考虑过使用某种组合运算符,如 zip
或 merge
或 combineLatest
并将其与计时器发布器组合,这可能是正确的方法,但我可以弄清楚如何配置它以提供我想要的行为。
编辑
这是一张大理石图,希望能说明我的意图:
Upstream Publisher:
-A-B-C-------------------D-E-F--------|>
My Custom Operator:
-A----B----C-------------D----E----F--|>
编辑 2:单元测试
这是一个单元测试,如果 modulatedPublisher
(我想要的缓冲发布者)按预期工作,它应该会通过。它并不完美,但它会在收到事件(包括收到的时间)时存储它们,然后比较事件之间的时间间隔,确保它们不小于所需的间隔。
func testCustomPublisher() {
let expectation = XCTestExpectation(description: "async")
var events = [Event]()
let passthroughSubject = PassthroughSubject<Int, Never>()
let cancellable = passthroughSubject
.modulatedPublisher(interval: 1.0)
.sink { value in
events.append(Event(value: value, date: Date()))
print("value received: \(value) at \(self.dateFormatter.string(from:Date()))")
}
// WHEN I send 3 events, wait 6 seconds, and send 3 more events
passthroughSubject.send(1)
passthroughSubject.send(2)
passthroughSubject.send(3)
DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(6000)) {
passthroughSubject.send(4)
passthroughSubject.send(5)
passthroughSubject.send(6)
DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(4000)) {
// THEN I expect the stored events to be no closer together in time than the interval of 1.0s
for i in 1 ..< events.count {
let interval = events[i].date.timeIntervalSince(events[i-1].date)
print("Interval: \(interval)")
// There's some small error in the interval but it should be about 1 second since I'm using a 1s modulated publisher.
XCTAssertTrue(interval > 0.99)
}
expectation.fulfill()
}
}
wait(for: [expectation], timeout: 15)
}
我最接近的是使用 zip
,像这样:
public extension Publisher where Self.Failure == Never {
func modulatedPublisher(interval: TimeInterval) -> AnyPublisher<Output, Never> {
let timerBuffer = Timer
.publish(every: interval, on: .main, in: .common)
.autoconnect()
return timerBuffer
.zip(self, { }) // should emit one input element () every timer tick
.eraseToAnyPublisher()
}
}
这会正确调整前三个事件(1、2 和 3),但不会调整后三个事件(4、5 和 6)。输出:
value received: 1 at 3:54:07.0007
value received: 2 at 3:54:08.0008
value received: 3 at 3:54:09.0009
value received: 4 at 3:54:12.0012
value received: 5 at 3:54:12.0012
value received: 6 at 3:54:12.0012
我相信这是因为 zip
有一些内部缓冲能力。前三个上游事件被缓冲并按照定时器的节奏发出,但是在 6 秒等待期间,定时器的事件被缓冲 - 当第二个设置上游事件被触发时,已经有定时器事件在队列中等待,所以它们配对并立即发射。
这是一个有趣的问题。我玩过 Timer.publish
、buffer
、zip
和 throttle
的各种组合,但我无法让任何组合按照您想要的方式工作。因此,让我们编写一个自定义订阅者。
我们真正想要的是 API 当我们从上游获得输入时,我们还能够控制上游何时交付下一个输入。像这样:
extension Publisher {
/// Subscribe to me with a stepping function.
/// - parameter stepper: A function I'll call with each of my inputs, and with my completion.
/// Each time I call this function with an input, I also give it a promise function.
/// I won't deliver the next input until the promise is called with a `.more` argument.
/// - returns: An object you can use to cancel the subscription asynchronously.
func step(with stepper: @escaping (StepEvent<Output, Failure>) -> ()) -> AnyCancellable {
???
}
}
enum StepEvent<Input, Failure: Error> {
/// Handle the Input. Call `StepPromise` when you're ready for the next Input,
/// or to cancel the subscription.
case input(Input, StepPromise)
/// Upstream completed the subscription.
case completion(Subscribers.Completion<Failure>)
}
/// The type of callback given to the stepper function to allow it to continue
/// or cancel the stream.
typealias StepPromise = (StepPromiseRequest) -> ()
enum StepPromiseRequest {
// Pass this to the promise to request the next item from upstream.
case more
// Pass this to the promise to cancel the subscription.
case cancel
}
有了这个 step
API,我们可以写一个 pace
运算符来做你想做的事:
extension Publisher {
func pace<Context: Scheduler, MySubject: Subject>(
_ pace: Context.SchedulerTimeType.Stride, scheduler: Context, subject: MySubject)
-> AnyCancellable
where MySubject.Output == Output, MySubject.Failure == Failure
{
return step {
switch [=11=] {
case .input(let input, let promise):
// Send the input from upstream now.
subject.send(input)
// Wait for the pace interval to elapse before requesting the
// next input from upstream.
scheduler.schedule(after: scheduler.now.advanced(by: pace)) {
promise(.more)
}
case .completion(let completion):
subject.send(completion: completion)
}
}
}
}
此 pace
运算符采用 pace
(输出之间所需的间隔)、一个用于安排事件的调度程序以及一个用于重新发布上游输入的 subject
。它通过 subject
发送每个输入来处理每个输入,然后在从上游请求下一个输入之前使用调度程序等待步速间隔。
现在我们只需要实现 step
运算符。 Combine 在这里没有给我们太多帮助。它确实有一个称为“背压”的特性,这意味着发布者不能向下游发送输入,直到下游通过向上游发送 Subscribers.Demand
来请求它。通常您会看到下游向上游发送 .unlimited
需求,但我们不会这样做。相反,我们将利用背压。在步进器完成承诺之前,我们不会向上游发送任何需求,然后我们只会发送 .max(1)
的需求,因此我们使上游与步进器同步运行。 (我们还要发送一个.max(1)
的初始需求来启动整个过程。)
好的,所以需要实现一个带有步进函数并符合Subscriber
的类型。最好查看 Reactive Streams JVM Specification,因为 Combine 是基于该规范的。
实现困难的原因是有几件事可以异步调用我们的订阅者:
- 上游可以从任何线程调用订阅者(但需要序列化其调用)。
- 在我们为步进器提供承诺函数后,步进器可以在任何线程上调用这些承诺。
- 我们希望订阅可以取消,并且取消可以发生在任何线程上。
- 所有这些异步性意味着我们必须用锁来保护我们的内部状态。
- 我们必须小心,不要在持有那个锁的同时调用,以避免死锁。
我们还将通过为每个承诺提供唯一 ID 来保护订阅者免受涉及重复调用承诺或调用过时承诺的恶作剧。
这是我们的基本订户定义:
import Combine
import Foundation
public class SteppingSubscriber<Input, Failure: Error> {
public init(stepper: @escaping Stepper) {
l_state = .subscribing(stepper)
}
public typealias Stepper = (Event) -> ()
public enum Event {
case input(Input, Promise)
case completion(Completion)
}
public typealias Promise = (Request) -> ()
public enum Request {
case more
case cancel
}
public typealias Completion = Subscribers.Completion<Failure>
private let lock = NSLock()
// The l_ prefix means it must only be accessed while holding the lock.
private var l_state: State
private var l_nextPromiseId: PromiseId = 1
private typealias PromiseId = Int
private var noPromiseId: PromiseId { 0 }
}
请注意,我将之前的辅助类型(StepEvent
、StepPromise
和 StepPromiseRequest
)移到了 SteppingSubscriber
中并缩短了它们的名称。
现在让我们考虑 l_state
的神秘类型 State
。我们的订阅者可能处于哪些不同状态?
- 我们可能正在等待从上游接收
Subscription
对象。 - 我们可以从上游接收到
Subscription
并等待信号(来自上游的输入或完成,或者来自步进器的承诺的完成)。 - 我们可能正在调用步进器,我们要小心以防它在我们调用它时完成承诺。
- 我们可能已被取消或已从上游收到完成。
所以这是我们对 State
的定义:
extension SteppingSubscriber {
private enum State {
// Completed or cancelled.
case dead
// Waiting for Subscription from upstream.
case subscribing(Stepper)
// Waiting for a signal from upstream or for the latest promise to be completed.
case subscribed(Subscribed)
// Calling out to the stopper.
case stepping(Stepping)
var subscription: Subscription? {
switch self {
case .dead: return nil
case .subscribing(_): return nil
case .subscribed(let subscribed): return subscribed.subscription
case .stepping(let stepping): return stepping.subscribed.subscription
}
}
struct Subscribed {
var stepper: Stepper
var subscription: Subscription
var validPromiseId: PromiseId
}
struct Stepping {
var subscribed: Subscribed
// If the stepper completes the current promise synchronously with .more,
// I set this to true.
var shouldRequestMore: Bool
}
}
}
由于我们使用 NSLock
(为简单起见),让我们定义一个扩展以确保我们始终将锁定与解锁相匹配:
fileprivate extension NSLock {
@inline(__always)
func sync<Answer>(_ body: () -> Answer) -> Answer {
lock()
defer { unlock() }
return body()
}
}
现在我们已准备好处理一些事件。最容易处理的事件是异步取消,这是 Cancellable
协议的唯一要求。如果我们处于 .dead
以外的任何状态,我们希望成为 .dead
并且,如果有上游订阅,则取消它。
extension SteppingSubscriber: Cancellable {
public func cancel() {
let sub: Subscription? = lock.sync {
defer { l_state = .dead }
return l_state.subscription
}
sub?.cancel()
}
}
注意这里我不想在 lock
被锁定时调用上游订阅的 cancel
函数,因为 lock
不是递归锁而且我不'想冒险陷入僵局。 lock.sync
的所有使用都遵循将任何调用推迟到锁定解锁之后的模式。
现在让我们实现 Subscriber
协议要求。首先,让我们处理从上游接收 Subscription
的问题。唯一应该发生的情况是当我们处于 .subscribing
状态时,但 .dead
也是可能的,在这种情况下我们只想取消上游订阅。
extension SteppingSubscriber: Subscriber {
public func receive(subscription: Subscription) {
let action: () -> () = lock.sync {
guard case .subscribing(let stepper) = l_state else {
return { subscription.cancel() }
}
l_state = .subscribed(.init(stepper: stepper, subscription: subscription, validPromiseId: noPromiseId))
return { subscription.request(.max(1)) }
}
action()
}
请注意,在使用 lock.sync
时(以及所有后续使用中),我 return 一个“动作”闭包,因此我可以在锁解锁后执行任意调用。
我们要解决的下一个 Subscriber
协议要求是接收完成:
public func receive(completion: Subscribers.Completion<Failure>) {
let action: (() -> ())? = lock.sync {
// The only state in which I have to handle this call is .subscribed:
// - If I'm .dead, either upstream already completed (and shouldn't call this again),
// or I've been cancelled.
// - If I'm .subscribing, upstream must send me a Subscription before sending me a completion.
// - If I'm .stepping, upstream is currently signalling me and isn't allowed to signal
// me again concurrently.
guard case .subscribed(let subscribed) = l_state else {
return nil
}
l_state = .dead
return { [stepper = subscribed.stepper] in
stepper(.completion(completion))
}
}
action?()
}
我们最复杂的 Subscriber
协议要求是接收 Input
:
- 我们必须做出承诺。
- 我们必须将承诺传递给步进器。
- 步进器可以在 returning 之前完成承诺。
- 在 stepper returns 之后,我们必须检查它是否完成了
.more
的承诺,如果是,return 上游的适当需求。
由于我们必须在这项工作的中间调出步进器,我们有一些丑陋的 lock.sync
调用嵌套。
public func receive(_ input: Input) -> Subscribers.Demand {
let action: (() -> Subscribers.Demand)? = lock.sync {
// The only state in which I have to handle this call is .subscribed:
// - If I'm .dead, either upstream completed and shouldn't call this,
// or I've been cancelled.
// - If I'm .subscribing, upstream must send me a Subscription before sending me Input.
// - If I'm .stepping, upstream is currently signalling me and isn't allowed to
// signal me again concurrently.
guard case .subscribed(var subscribed) = l_state else {
return nil
}
let promiseId = l_nextPromiseId
l_nextPromiseId += 1
let promise: Promise = { request in
self.completePromise(id: promiseId, request: request)
}
subscribed.validPromiseId = promiseId
l_state = .stepping(.init(subscribed: subscribed, shouldRequestMore: false))
return { [stepper = subscribed.stepper] in
stepper(.input(input, promise))
let demand: Subscribers.Demand = self.lock.sync {
// The only possible states now are .stepping and .dead.
guard case .stepping(let stepping) = self.l_state else {
return .none
}
self.l_state = .subscribed(stepping.subscribed)
return stepping.shouldRequestMore ? .max(1) : .none
}
return demand
}
}
return action?() ?? .none
}
} // end of extension SteppingSubscriber: Publisher
我们的订户需要处理的最后一件事是完成承诺。由于以下几个原因,这很复杂:
- 我们想要防止一个承诺被多次完成。
- 我们想防止旧的承诺被完成。
- 当承诺完成时,我们可以处于任何状态。
因此:
extension SteppingSubscriber {
private func completePromise(id: PromiseId, request: Request) {
let action: (() -> ())? = lock.sync {
switch l_state {
case .dead, .subscribing(_): return nil
case .subscribed(var subscribed) where subscribed.validPromiseId == id && request == .more:
subscribed.validPromiseId = noPromiseId
l_state = .subscribed(subscribed)
return { [sub = subscribed.subscription] in
sub.request(.max(1))
}
case .subscribed(let subscribed) where subscribed.validPromiseId == id && request == .cancel:
l_state = .dead
return { [sub = subscribed.subscription] in
sub.cancel()
}
case .subscribed(_):
// Multiple completion or stale promise.
return nil
case .stepping(var stepping) where stepping.subscribed.validPromiseId == id && request == .more:
stepping.subscribed.validPromiseId = noPromiseId
stepping.shouldRequestMore = true
l_state = .stepping(stepping)
return nil
case .stepping(let stepping) where stepping.subscribed.validPromiseId == id && request == .cancel:
l_state = .dead
return { [sub = stepping.subscribed.subscription] in
sub.cancel()
}
case .stepping(_):
// Multiple completion or stale promise.
return nil
}
}
action?()
}
}
哇哦!
完成所有这些后,我们可以编写真正的 step
运算符:
extension Publisher {
func step(with stepper: @escaping (SteppingSubscriber<Output, Failure>.Event) -> ()) -> AnyCancellable {
let subscriber = SteppingSubscriber<Output, Failure>(stepper: stepper)
self.subscribe(subscriber)
return .init(subscriber)
}
}
然后我们可以尝试上面的 pace
运算符。由于我们不在 SteppingSubscriber
中进行任何缓冲,并且上游通常不进行缓冲,因此我们将在上游和我们的 pace
运算符之间插入一个 buffer
。
var cans: [AnyCancellable] = []
func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
let erratic = Just("A").delay(for: 0.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher()
.merge(with: Just("B").delay(for: 0.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("C").delay(for: 0.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("D").delay(for: 5.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("E").delay(for: 5.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("F").delay(for: 5.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.handleEvents(
receiveOutput: { print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") },
receiveCompletion: { print("erratic: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") }
)
.makeConnectable()
let subject = PassthroughSubject<String, Never>()
cans += [erratic
.buffer(size: 1000, prefetch: .byRequest, whenFull: .dropOldest)
.pace(.seconds(1), scheduler: DispatchQueue.main, subject: subject)]
cans += [subject.sink(
receiveCompletion: { print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") },
receiveValue: { print("paced: \(Double(DispatchTime.now().rawValue) / 1_000_000_000) \([=21=])") }
)]
let c = erratic.connect()
cans += [AnyCancellable { c.cancel() }]
return true
}
最后,这里是输出:
erratic: 223394.17115897 A
paced: 223394.171495405 A
erratic: 223394.408086369 B
erratic: 223394.739186984 C
paced: 223395.171615624 B
paced: 223396.27056174 C
erratic: 223399.536717127 D
paced: 223399.536782847 D
erratic: 223399.536834495 E
erratic: 223400.236808469 F
erratic: 223400.236886323 finished
paced: 223400.620542561 E
paced: 223401.703613078 F
paced: 223402.703828512 finished
- 时间戳以秒为单位。
- 古怪的出版商的时间确实古怪,有时甚至会在时间上接近。
- 节奏计时始终至少相隔一秒,即使不稳定事件发生相隔不到一秒。
- 当一个不稳定事件在前一个事件发生超过一秒后发生时,节奏事件将在不稳定事件之后立即发送,不会进一步延迟。
- 节奏完成发生在最后一个节奏事件之后一秒,即使不稳定完成发生在最后一个不稳定事件之后。
buffer
在发送最后一个事件后收到另一个请求之前不会发送完成,并且该请求被步调计时器延迟。
为了简单 copy/paste.
,我已将step
运算符的整个实现放在 this gist 中
只是想提一下,我改编了 Rob 之前的回答并将其转换为自定义 Publisher,以便允许单个不间断的管道(请参阅他的解决方案下方的评论)。我的改编在下面,但所有的功劳仍然归于他。它还使用 Rob 的 step
运算符和 SteppingSubscriber
,因为此自定义发布者在内部使用它们。
编辑:作为 modulated
运算符的一部分使用缓冲区更新,否则需要附加缓冲区以缓冲上游事件。
public extension Publisher {
func modulated<Context: Scheduler>(_ pace: Context.SchedulerTimeType.Stride, scheduler: Context) -> AnyPublisher<Output, Failure> {
let upstream = buffer(size: 1000, prefetch: .byRequest, whenFull: .dropNewest).eraseToAnyPublisher()
return PacePublisher<Context, AnyPublisher>(pace: pace, scheduler: scheduler, source: upstream).eraseToAnyPublisher()
}
}
final class PacePublisher<Context: Scheduler, Source: Publisher>: Publisher {
typealias Output = Source.Output
typealias Failure = Source.Failure
let subject: PassthroughSubject<Output, Failure>
let scheduler: Context
let pace: Context.SchedulerTimeType.Stride
lazy var internalSubscriber: SteppingSubscriber<Output, Failure> = SteppingSubscriber<Output, Failure>(stepper: stepper)
lazy var stepper: ((SteppingSubscriber<Output, Failure>.Event) -> ()) = {
switch [=10=] {
case .input(let input, let promise):
// Send the input from upstream now.
self.subject.send(input)
// Wait for the pace interval to elapse before requesting the
// next input from upstream.
self.scheduler.schedule(after: self.scheduler.now.advanced(by: self.pace)) {
promise(.more)
}
case .completion(let completion):
self.subject.send(completion: completion)
}
}
init(pace: Context.SchedulerTimeType.Stride, scheduler: Context, source: Source) {
self.scheduler = scheduler
self.pace = pace
self.subject = PassthroughSubject<Source.Output, Source.Failure>()
source.subscribe(internalSubscriber)
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subject.subscribe(subscriber)
subject.send(subscription: PaceSubscription(subscriber: subscriber))
}
}
public class PaceSubscription<S: Subscriber>: Subscription {
private var subscriber: S?
init(subscriber: S) {
self.subscriber = subscriber
}
public func request(_ demand: Subscribers.Demand) {
}
public func cancel() {
subscriber = nil
}
}
Publishers.CollectByTime
在这里有什么用处吗?
Publishers.CollectByTime(upstream: upstreamPublisher.share(), strategy: Publishers.TimeGroupingStrategy.byTime(RunLoop.main, .seconds(1)), options: nil)
编辑
下面概述的原始方法有一种更简单的方法,它不需要起搏器,而是使用由 flatMap(maxPublishers: .max(1))
创建的背压。
flatMap
发送 1 的需求,直到其返回的发布者(我们可以延迟)完成。我们需要 Buffer
上游发布者来缓冲这些值。
// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let interval = 1.0
let pub = subject
.buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(1)) {
Just([=10=])
.delay(for: .seconds(interval), scheduler: DispatchQueue.main)
}
原版
我知道这是一个老问题,但我认为有更简单的方法来实现它,所以我想分享一下。
这个想法类似于 .zip
和 Timer
,除了 Timer
,你会 .zip
从先前发送的值,可以使用 CurrentValueSubject
实现。需要 CurrentValueSubject
而不是 PassthroughSubject
才能播种第一个“tick”。
// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject<Date, Never>()
let pacer = CurrentValueSubject<Void, Never>(())
let interval = 1.0
let pub = subject.zip(pacer)
.flatMap { v in
Just(v.0) // extract the original value
.delay(for: .seconds(interval), scheduler: DispatchQueue.main)
.handleEvents(receiveOutput: { _ in
pacer.send() // send the pacer "tick" after the interval
})
}
发生的事情是 .zip
起搏器上的门,它仅在先前发送的值延迟后到达。
如果下一个值早于允许的时间间隔,它会等待起搏器。 但是,如果下一个值晚出现,那么起搏器已经有一个新值可以立即提供,因此不会有延迟。
如果您像在测试用例中那样使用它:
let c = pub.sink { print("\([=12=]): \(Date())") }
subject.send(Date())
subject.send(Date())
subject.send(Date())
DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) {
subject.send(Date())
subject.send(Date())
}
DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) {
subject.send(Date())
subject.send(Date())
}
结果会是这样的:
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:21 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:22 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:23 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:24 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:25 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:32 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:33 +0000