如何在没有扫描功能的情况下在 Rx 中保留状态
How to retain state in Rx without the scan function
我正在努力将我的一些视图模型移植到(粗略的)有限状态机中,因为我的 UI 倾向于很好地适应该模式(Mealy/Moore,不关心这个问题的目的)。此外,如果做得好 - 状态机真正清理测试 - 因为它们禁止某些测试排列发生。
我当前的视图模型使用 RxSwift(和 RxKotlin - 取决于应用程序),底层用例(数据库调用、网络调用等)也使用 Rx(因此我需要留下来在那个生态系统中)。
我发现 Rx 很棒,状态机很棒 --> Rx + 状态机似乎有点杂乱无章,无法完成任何重要的事情。例如,我知道我可以使用 .scan
运算符来保留一些状态,如果我的状态机是完全同步的(例如,在 Swift 中大致是这样的):
enum Event {
case event1
case event2
case event3
}
enum State {
case state1
case state2
case state3
func on(event: Event) -> State {
switch (self, event) {
case (.state1, .event1):
// Do something
return .state2
case (.state2, .event2):
// Do something
return .state3
default:
return self // (or nil, or something)
}
}
}
func foo() -> Observable<State> {
let events = Observable<Event>.of(.event1, .event2, .event3)
return events.scan(State.state1) { (currentState, event) -> State in
return currentState.on(event)
}
}
但是,如果我的 State.on
函数中的 return 是一个 Observable(比如网络调用或需要很长时间的东西,它已经在 Rx 中),我该怎么办?
enum State {
case notLoggedIn
case loggingIn
case loggedIn
case error
func on(event: Event) -> Observable<State> {
switch (self, event) {
case (.notLoggedIn, .event1):
return api.login(credentials)
.map({ (isLoggedIn) -> State in
if isLoggedIn {
return .loggedIn
}
return .error
})
.startWith(.loggingIn)
... other code ...
default:
return self
}
}
}
我试过让 .scan
运算符接收一个 Observable 累加器,但这段代码的结果是状态机被订阅或 运行 次数太多。我猜是因为它 运行s 在正在累积的可观察对象的每个状态上。
return events.scan(Observable.just(State.state1)) { (currentState, event) -> Observable<State> in
currentState.flatMap({ (innerState) -> Observable<State> in
return innerState.on(event: event)
})
}.flatMap { (states) -> Observable<State> in
return states
}
我认为,如果我能够干净利落地拉回 state
变量,最简单的实现可能如下所示:
return events.flatMapLatest({ (event) -> Observable<State> in
return self.state.on(event: event)
.do(onNext: { (state) in
self.state = state
})
})
但是,从一个私有状态变量拉入一个可观察的流,并更新它 - 好吧,它不仅丑陋,我觉得我只是在等待被并发错误击中。
编辑:根据 Sereja Bogolubov 的反馈 - 我添加了一个中继并提出了这段代码 - 仍然不是很好,但已经到了。
let relay = BehaviorRelay<State>(value: .initial)
...
func transition(from state: State, on event: Event) -> Observable<State> {
switch (state, event) {
case (.notLoggedIn, .event1):
return api.login(credentials)
.map({ (isLoggedIn) -> State in
if isLoggedIn {
return .loggedIn
}
return .error
})
.startWith(.loggingIn)
... other code ...
default:
return self
}
}
return events.withLatestFrom(relay.asObservable(), resultSelector: { (event, state) -> Observable<State> in
return self.transition(from: state, on: event)
.do(onNext: { (state) in
self.relay.accept(state)
})
}).flatMap({ (states) -> Observable<State> in
return states
})
中继(或重放主题或其他)根据状态转换的结果在 doOnNext
中更新...这仍然感觉它可能会导致并发问题,但不确定还有什么会工作。
不,您不必完全同步即可维护任意复杂状态。
是的,有一些方法可以在没有 scan
的情况下实现所需的行为。 withLatestFrom 怎么样,其中 other
是您当前的状态(即一个单独的 Observable<MyState>
,但您需要 ReplaySubject<MyState>
在引擎盖下)。
如果您需要更多详细信息,请告诉我。
概念验证,javascript:
const source = range(0, 10);
const state = new ReplaySubject(1);
const example = source.pipe(
withLatestFrom(state), // that's the way you read actual state
map(([n, currentState]) => {
state.next(n); // that's the way you change the state
return ...
})
);
请注意,更复杂的情况(如有风险的竞争条件)可能需要至少与 combineLatest 和 approp 一样复杂的东西。 Scheduler
到位。
我觉得Elm的系统在这里可以派上用场。在 Elm 中,你传递给系统的减速器不仅仅是 return 状态,它还 return 是一个 "command",在我们的例子中是 Observable<Event>
(不是RxSwift.Event,但是你的事件枚举。)这个命令没有存储在扫描的状态中,而是在扫描之外被订阅并且它的输出被反馈到扫描中(通过某种主题。)需要取消的任务会观察当前状态,并根据状态开始和停止操作。
RxSwift 生态系统中有几个库可以帮助简化这类事情。两个主要的是 ReactorKit and RxFeedback。还有其他几个...
有关我所说内容的简单示例,请查看 this gist。这种系统允许您的 Moore 机器在进入可能导致 0..n 个新输入事件的状态时触发一个动作。
我正在努力将我的一些视图模型移植到(粗略的)有限状态机中,因为我的 UI 倾向于很好地适应该模式(Mealy/Moore,不关心这个问题的目的)。此外,如果做得好 - 状态机真正清理测试 - 因为它们禁止某些测试排列发生。
我当前的视图模型使用 RxSwift(和 RxKotlin - 取决于应用程序),底层用例(数据库调用、网络调用等)也使用 Rx(因此我需要留下来在那个生态系统中)。
我发现 Rx 很棒,状态机很棒 --> Rx + 状态机似乎有点杂乱无章,无法完成任何重要的事情。例如,我知道我可以使用 .scan
运算符来保留一些状态,如果我的状态机是完全同步的(例如,在 Swift 中大致是这样的):
enum Event {
case event1
case event2
case event3
}
enum State {
case state1
case state2
case state3
func on(event: Event) -> State {
switch (self, event) {
case (.state1, .event1):
// Do something
return .state2
case (.state2, .event2):
// Do something
return .state3
default:
return self // (or nil, or something)
}
}
}
func foo() -> Observable<State> {
let events = Observable<Event>.of(.event1, .event2, .event3)
return events.scan(State.state1) { (currentState, event) -> State in
return currentState.on(event)
}
}
但是,如果我的 State.on
函数中的 return 是一个 Observable(比如网络调用或需要很长时间的东西,它已经在 Rx 中),我该怎么办?
enum State {
case notLoggedIn
case loggingIn
case loggedIn
case error
func on(event: Event) -> Observable<State> {
switch (self, event) {
case (.notLoggedIn, .event1):
return api.login(credentials)
.map({ (isLoggedIn) -> State in
if isLoggedIn {
return .loggedIn
}
return .error
})
.startWith(.loggingIn)
... other code ...
default:
return self
}
}
}
我试过让 .scan
运算符接收一个 Observable 累加器,但这段代码的结果是状态机被订阅或 运行 次数太多。我猜是因为它 运行s 在正在累积的可观察对象的每个状态上。
return events.scan(Observable.just(State.state1)) { (currentState, event) -> Observable<State> in
currentState.flatMap({ (innerState) -> Observable<State> in
return innerState.on(event: event)
})
}.flatMap { (states) -> Observable<State> in
return states
}
我认为,如果我能够干净利落地拉回 state
变量,最简单的实现可能如下所示:
return events.flatMapLatest({ (event) -> Observable<State> in
return self.state.on(event: event)
.do(onNext: { (state) in
self.state = state
})
})
但是,从一个私有状态变量拉入一个可观察的流,并更新它 - 好吧,它不仅丑陋,我觉得我只是在等待被并发错误击中。
编辑:根据 Sereja Bogolubov 的反馈 - 我添加了一个中继并提出了这段代码 - 仍然不是很好,但已经到了。
let relay = BehaviorRelay<State>(value: .initial)
...
func transition(from state: State, on event: Event) -> Observable<State> {
switch (state, event) {
case (.notLoggedIn, .event1):
return api.login(credentials)
.map({ (isLoggedIn) -> State in
if isLoggedIn {
return .loggedIn
}
return .error
})
.startWith(.loggingIn)
... other code ...
default:
return self
}
}
return events.withLatestFrom(relay.asObservable(), resultSelector: { (event, state) -> Observable<State> in
return self.transition(from: state, on: event)
.do(onNext: { (state) in
self.relay.accept(state)
})
}).flatMap({ (states) -> Observable<State> in
return states
})
中继(或重放主题或其他)根据状态转换的结果在 doOnNext
中更新...这仍然感觉它可能会导致并发问题,但不确定还有什么会工作。
不,您不必完全同步即可维护任意复杂状态。
是的,有一些方法可以在没有 scan
的情况下实现所需的行为。 withLatestFrom 怎么样,其中 other
是您当前的状态(即一个单独的 Observable<MyState>
,但您需要 ReplaySubject<MyState>
在引擎盖下)。
如果您需要更多详细信息,请告诉我。
概念验证,javascript:
const source = range(0, 10);
const state = new ReplaySubject(1);
const example = source.pipe(
withLatestFrom(state), // that's the way you read actual state
map(([n, currentState]) => {
state.next(n); // that's the way you change the state
return ...
})
);
请注意,更复杂的情况(如有风险的竞争条件)可能需要至少与 combineLatest 和 approp 一样复杂的东西。 Scheduler
到位。
我觉得Elm的系统在这里可以派上用场。在 Elm 中,你传递给系统的减速器不仅仅是 return 状态,它还 return 是一个 "command",在我们的例子中是 Observable<Event>
(不是RxSwift.Event,但是你的事件枚举。)这个命令没有存储在扫描的状态中,而是在扫描之外被订阅并且它的输出被反馈到扫描中(通过某种主题。)需要取消的任务会观察当前状态,并根据状态开始和停止操作。
RxSwift 生态系统中有几个库可以帮助简化这类事情。两个主要的是 ReactorKit and RxFeedback。还有其他几个...
有关我所说内容的简单示例,请查看 this gist。这种系统允许您的 Moore 机器在进入可能导致 0..n 个新输入事件的状态时触发一个动作。