RxSwift,计算发出了多少个相等的连续 Equatable 项目
RxSwift, count how many equal consecutive Equatable items are emitted
我有一个 Observable 发出一系列 Equatable 元素。
流可能包含相同元素的连续序列(例如在序列 [1, 1, 1, 18, 2, 2, 0, -1] 中,元素 1 重复 3 次,2 重复 2 次)。
我需要压缩序列,为每个不同的元素替换一个包含元素和原始流中存在的重复次数的元组:
1, 1, 1, 18, 21, 21, 0, -1, -8, -8, 14, 14, 14...
(1, 3), (18, 1), (21, 2), (0, 1), (-1, 1), (-8, 2), (14, 3)...
我设法用 scan
运算符计算了重复次数,但它发出了所有不应成为最终序列一部分的部分计算:
let numbers = Observable<Int>.from([
0, 0,
3, 3, 3, 3, 3,
2,
0, 0, 0,
6, 6,
])
let reps = numbers
.scan((0, 0), accumulator: {
(prev: (Int, Int), new: (Int)) in
if prev.0 == new {
return (new, prev.1 + 1)
} else {
return (new, 1)
}
})
reps.subscribe(onNext: {
print("\([=11=])")
})
// expected:
// (0, 2), (3, 5), (2, 1), (0, 3), (6, 2)
//
// result:
// (0, 1), (0, 2),
// (3, 1), (3, 2), (3, 3), (3, 4), (3, 5),
// (2, 1),
// (0, 1), (0, 2), (0, 3),
// (6, 1), (6, 2)
我想我想出了一个可能的解决方案:
创建一个新的 Optional<T>
的 Observable(其中 T
是 numbers
中的 Equatable
元素)从 numbers
并向其附加 (concat
) 一个 nil
元素。
zip
Observable 与 reps
Observable 已经生成。
filter
结果 Observable。
// step 1
let shiftedNumbers = Observable<Int?> = numbers
.skip(1)
.map({ [=10=] })
.concat(Observable<Int?>.just(nil))
// step 2 and 3
let zippedNumbers = Observable<(Int, Int)?>
.zip(counts, shortNumbers, resultSelector: {
if == nil || != [=10=].0 {
return [=10=]
} else {
return nil
}
})
.filter({
[=10=] != nil
})
zippedNumbers.subscribe(onNext: {
print([=10=]!)
})
// result:
// (0, 2), (3, 5), (2, 1), (0, 3), (6, 2)
跳过 numbers
序列的第一个元素可以 "see in advance" 下一个元素是什么,让我们有机会发出正确的元组。
您可以编写自己的运算符。这是一个示例实现。我创建了一个新的可观察对象,它订阅了 self
的事件。任何时候 self
有一个新元素,都会命中 switch
中的 .next
情况,这会进行 运行 长度的预订保持。每当遇到不同的元素、错误或完成时,就会发出分组。
extension ObservableType where Self.E: Equatable {
func runLengthEncode() -> Observable<(element: E, count: Int)> {
var lastGrouping: (element: E, count: Int)? = nil
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next(let currentElement):
if let currentGrouping = lastGrouping {
if currentGrouping.element == currentElement {
lastGrouping = (element: currentElement, count: currentGrouping.count + 1)
}
else { // This run ended, a new element was encountered.
lastGrouping = (element: currentElement, count: 1) // start a new grouping
observer.on(.next(currentGrouping)) // emit the completed grouping
}
} else {
lastGrouping = (element: currentElement, count: 1)
}
case .error(let error):
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.error(error))
case .completed:
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.completed)
}
}
}
}
}
您还可以实现免费的 运行 长度解码运算符:
extension ObservableType {
func runLengthDecode<Element>() -> Observable<Element>
where Self.E == (element: Element, count: Int) {
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next((element: let element, count: let count)):
for _ in 1...count {
observer.on(.next(element))
}
case .error(let error): observer.on(.error(error))
case .completed: observer.on(.completed)
}
}
}
}
}
测试用例:
let numbers = Observable<Int>.from([
0, 0,
3, 3, 3, 3, 3,
2,
0, 0, 0,
6, 6,
])
let runLengthEncoded = numbers.runLengthEncode()
runLengthEncoded.subscribe { print([=12=]) }
let runLengthDecoded = runLengthEncoded.runLengthDecode()
runLengthDecoded.subscribe { print([=12=]) }
我有一个 Observable 发出一系列 Equatable 元素。 流可能包含相同元素的连续序列(例如在序列 [1, 1, 1, 18, 2, 2, 0, -1] 中,元素 1 重复 3 次,2 重复 2 次)。 我需要压缩序列,为每个不同的元素替换一个包含元素和原始流中存在的重复次数的元组:
1, 1, 1, 18, 21, 21, 0, -1, -8, -8, 14, 14, 14...
(1, 3), (18, 1), (21, 2), (0, 1), (-1, 1), (-8, 2), (14, 3)...
我设法用 scan
运算符计算了重复次数,但它发出了所有不应成为最终序列一部分的部分计算:
let numbers = Observable<Int>.from([
0, 0,
3, 3, 3, 3, 3,
2,
0, 0, 0,
6, 6,
])
let reps = numbers
.scan((0, 0), accumulator: {
(prev: (Int, Int), new: (Int)) in
if prev.0 == new {
return (new, prev.1 + 1)
} else {
return (new, 1)
}
})
reps.subscribe(onNext: {
print("\([=11=])")
})
// expected:
// (0, 2), (3, 5), (2, 1), (0, 3), (6, 2)
//
// result:
// (0, 1), (0, 2),
// (3, 1), (3, 2), (3, 3), (3, 4), (3, 5),
// (2, 1),
// (0, 1), (0, 2), (0, 3),
// (6, 1), (6, 2)
我想我想出了一个可能的解决方案:
创建一个新的
Optional<T>
的 Observable(其中T
是numbers
中的Equatable
元素)从numbers
并向其附加 (concat
) 一个nil
元素。zip
Observable 与reps
Observable 已经生成。filter
结果 Observable。
// step 1
let shiftedNumbers = Observable<Int?> = numbers
.skip(1)
.map({ [=10=] })
.concat(Observable<Int?>.just(nil))
// step 2 and 3
let zippedNumbers = Observable<(Int, Int)?>
.zip(counts, shortNumbers, resultSelector: {
if == nil || != [=10=].0 {
return [=10=]
} else {
return nil
}
})
.filter({
[=10=] != nil
})
zippedNumbers.subscribe(onNext: {
print([=10=]!)
})
// result:
// (0, 2), (3, 5), (2, 1), (0, 3), (6, 2)
跳过 numbers
序列的第一个元素可以 "see in advance" 下一个元素是什么,让我们有机会发出正确的元组。
您可以编写自己的运算符。这是一个示例实现。我创建了一个新的可观察对象,它订阅了 self
的事件。任何时候 self
有一个新元素,都会命中 switch
中的 .next
情况,这会进行 运行 长度的预订保持。每当遇到不同的元素、错误或完成时,就会发出分组。
extension ObservableType where Self.E: Equatable {
func runLengthEncode() -> Observable<(element: E, count: Int)> {
var lastGrouping: (element: E, count: Int)? = nil
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next(let currentElement):
if let currentGrouping = lastGrouping {
if currentGrouping.element == currentElement {
lastGrouping = (element: currentElement, count: currentGrouping.count + 1)
}
else { // This run ended, a new element was encountered.
lastGrouping = (element: currentElement, count: 1) // start a new grouping
observer.on(.next(currentGrouping)) // emit the completed grouping
}
} else {
lastGrouping = (element: currentElement, count: 1)
}
case .error(let error):
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.error(error))
case .completed:
if let lastGrouping = lastGrouping { observer.on(.next(lastGrouping)) } // Emit the last unemitted grouping.
observer.on(.completed)
}
}
}
}
}
您还可以实现免费的 运行 长度解码运算符:
extension ObservableType {
func runLengthDecode<Element>() -> Observable<Element>
where Self.E == (element: Element, count: Int) {
return Observable.create { observer in
return self.subscribe { event in
switch event {
case .next((element: let element, count: let count)):
for _ in 1...count {
observer.on(.next(element))
}
case .error(let error): observer.on(.error(error))
case .completed: observer.on(.completed)
}
}
}
}
}
测试用例:
let numbers = Observable<Int>.from([
0, 0,
3, 3, 3, 3, 3,
2,
0, 0, 0,
6, 6,
])
let runLengthEncoded = numbers.runLengthEncode()
runLengthEncoded.subscribe { print([=12=]) }
let runLengthDecoded = runLengthEncoded.runLengthDecode()
runLengthDecoded.subscribe { print([=12=]) }