如何通过重复较短的序列来压缩可观察量

How to zip observables with repeating the shorter sequence

我正在尝试弄清楚如何实现以下结果:

A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
 : --1-2--3-1--2-3

其中 A、B 是输入流,“=”表示输出流(作为元组 A、B)

反之亦然:

A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
 : --1--2-3-4--5--6-7

所以在纯文本中 - 我正在寻找行为类似于 zip 运算符但具有 'replaying' 较短序列匹配较长序列的能力的东西。

知道如何解决这个问题吗?


解决方案 1

@DanielT提供的解决方案(有一些问题)

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

解决方案 2

我自己的解决方案受到以下答案的启发 - 自己的操作员 (HT @DanielT) 但采用更命令的方法 (HT @iamtimmo):

extension ObservableType {
    public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
        return Observable.create { observer in

            var bufferA: [A] = []
            let aComplete = PublishSubject<Bool>()
            aComplete.onNext(false);

            var bufferB: [B] = []
            let bComplete = PublishSubject<Bool>()
            bComplete.onNext(false);

            let disposableA = a.subscribe { event in
                switch event {
                case .next(let valueA):
                    bufferA.append(valueA)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete.onNext(true)
                    aComplete.onCompleted()
                }
            }

            let disposableB = b.subscribe { event in
                switch event {
                case .next(let value):
                    bufferB.append(value)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete.onNext(true)
                    bComplete.onCompleted()
                }
            }

            let disposableZip = Observable.zip(aComplete, bComplete)
                .filter { [=13=] ==  && [=13=] == true }
                .subscribe { event in
                    switch event {
                    case .next(_, _):
                        var zippedList = Array<(A?, B?)>()

                        let lengthA = bufferA.count
                        let lengthB = bufferB.count

                        if lengthA > 0 && lengthB > 0 {
                            for i in 0 ..< max(lengthA, lengthB) {
                                let aVal = bufferA[i % lengthA]
                                let bVal = bufferB[i % lengthB]
                                zippedList.append((aVal, bVal))
                            }
                        } else if lengthA == 0 {
                            zippedList = bufferB.map { (nil, [=13=]) }
                        } else {
                            zippedList = bufferA.map { ([=13=], nil) }
                        }

                        zippedList.forEach { observer.onNext([=13=]) }
                    case .completed:
                        observer.onCompleted()
                    case .error(let e):
                        observer.onError(e)
                    }
            }

            return Disposables.create(disposableA, disposableB, disposableZip)
        }
    }
}

class ZipRepeatTests: XCTestCase {
    func testALongerThanB() {
        assertAopBEqualsE(
            a: "-a--b--c-d--e-f-|",
            b: "--1-2-3-|",
            e: "a1,b2,c3,d1,e2,f3,|")
    }

    func testAShorterThanB() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "--1-2-3-|",
            e: "a1,b2,a3,|")
    }
    func testBStartsLater() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "----1---2|",
            e: "a1,b2,|")

    }
    func testABWithConstOffset() {
        assertAopBEqualsE(
            a: "-a--b--c|",
            b: "----1--2--3--|",
            e: "a1,b2,c3,|")
    }

    func testAEndsBeforeBStarts() {
        assertAopBEqualsE(
            a: "ab|",
            b: "---1-2-3-4-|",
            e: "a1,b2,a3,b4,|")
    }

    func testACompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-|",
            b: "--1-2-3-|",
            e: "1,2,3,|")
    }
    func testBCompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "|",
            e: "a,b,|")
    }
    func testNoData() {
        assertAopBEqualsE(
            a: "-|",
            b: "|",
            e: "|")
    }

    func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {

        let aStream = scheduler.createColdObservable(events(a))
        let bStream = scheduler.createColdObservable(events(b))
        let eStream = expected(e)

        let bResults = scheduler.start {
            Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\([=13=] ?? "")\( ?? "")" }
        }
        XCTAssertEqual(eStream, bResults.events.map { [=13=].value }, file: file, line: line)
    }
    func expected(_ stream: String) -> [Event<String>] {
        stream.split(separator: ",").map { String([=13=]) == "|" ? .completed : .next(String([=13=])) }
    }
    func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
        var time = 0
        var events = [Recorded<Event<String>>]()
        stream.forEach { c in
            if c == "|" {
                events.append(.completed(time))
            } else if c != "-" {
                events.append(.next(time, String(c)))
            }
            time += step
        }
        return events
    }
}

如有疑问,您可以随时创建自己的运算符:

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

以及显示功能的测试:

class RxSandboxTests: XCTestCase {

    func testLongA() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .next(40, "d"), .next(50, "e"), .next(60, "f"), .completed(60)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .completed(30)])

        let bResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { [=11=].1 }
        }

        XCTAssertEqual(bResults.events, [.next(210, 1), .next(220, 2), .next(230, 3), .next(240, 1), .next(250, 2), .next(260, 3), .completed(260)])
    }

    func testLongB() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .completed(30)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .next(40, 4), .next(50, 5), .next(60, 6), .completed(60)])

        let aResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { [=11=].0 }
        }

        XCTAssertEqual(aResults.events, [.next(210, "a"), .next(220, "b"), .next(230, "c"), .next(240, "a"), .next(250, "b"), .next(260, "c"), .completed(260)])
    }
}
  • Swift 5.1 / 合并 *

嗨@DegeH,

我不知道 RxSwift,但这里有些东西可能对您有所帮助。它使用新的 Combine 框架,但您可以映射到 RxSwift。只需将其粘贴到游乐场即可。

在示例中,pubApubB 分别从 AB 流中收集字符串,并发出这些字符串的列表。一旦两个流都完成,publisher 将两个列表(通过 zip())组合成一个列表。

列表被分开并送入 zipUnalignedPublishersLists 函数,该函数将它们转换为单个元组列表。这是重复较短流的值以与较长流的值对齐的地方。所以,也许这个功能对你的帮助最大。

最后,该元组列表 flatMap 编入序列发布者,subscriber 订阅了该发布者。

最后的 DispatchQueue.main.asyncAfter() 调用只是为了确保在 subscriber 有时间在操场上完成之前执行不会结束。你不希望它出现在你的应用程序中。

import Foundation
import Combine

func zipUnalignedPublishersLists(_ listA: [String], _ listB: [String]) -> [(String, String)] {
    var zippedList = Array<(String, String)>()

    let lengthA = listA.count
    let lengthB = listB.count
    for i in 0 ..< max(lengthA, lengthB) {
        let aVal = listA[i % lengthA]
        let bVal = listB[i % lengthB]
        zippedList.append( (aVal, bVal) )
    }

    return zippedList
}

let ptsA = PassthroughSubject<String, Never>()
let ptsB = PassthroughSubject<String, Never>()

let pubA = ptsA.collect().eraseToAnyPublisher()
let pubB = ptsB.collect().eraseToAnyPublisher()

let publisher = pubA
    .zip(pubB)
    .map({ listA, listB in
        zipUnalignedPublishersLists(listA, listB)
    })
    .flatMap { [=10=].publisher }
    .eraseToAnyPublisher()

let subscriber = publisher.sink(receiveValue: { print([=10=]) })

ptsA.send("a")
ptsB.send("1")
ptsA.send("b")
ptsB.send("2")
ptsA.send("c")
ptsB.send("3")
ptsB.send(completion: .finished)
ptsA.send("d")
ptsA.send("e")
ptsA.send("f")
ptsA.send(completion: .finished)

DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {}

这里有两个正交分量:

  1. 给定一个有限的元素列表,将它们转换为一个有限的循环元素序列。这经常出现,所以我实现了一个 ,它包装了一个元素数组,并按顺序分配它们的元素,永远重复。
  2. 一种将无限循环序列与可观察的流元素相结合的方法,将每个新元素与循环序列中的适当成员配对。

从概念上讲,重要的是要认识到循环序列 不是 可观察的。我们需要以 "pull" 的方式行事。当我们想要循环序列中的下一个元素时,我们会询问它(通过调用其 IteratorProtocol.next() 的实现)。这与 Observables 不同,它们有自己的年表,"push" 元素通过它们的运算符链。

因此,这是 map 使用循环序列中的 next() 元素对可观察对象的每个元素执行 ping 操作,如下所示:

var cyclingIterator = CycleSequence(cycling: ["a", "b", "c"]).makeIterator()

Observable.from(1...100)
    .asObservable()
    .map { ([=10=], cyclingIterator.next()) }
    .subscribe(onNext: { print([=10=]) })

打印:

(1, Optional(""))
(2, Optional(""))
(3, Optional(""))
(4, Optional(""))
(5, Optional(""))
(6, Optional(""))
(7, Optional(""))
(8, Optional(""))
(9, Optional(""))
(10, Optional(""))
...