有状态生命周期可观察
Observable with Stateful Lifecycle
一般的设计问题可以描述为:
我有一个 websocket 连接,它有一个严格的生命周期要遵守——它希望 connect
和 disconnect
被适当地调用,而且,因为它与系统对话,它使用 .在这个 websocket 连接中,我们有多个不同的订阅对象,每个对象都有一个严格的生命周期,它希望得到尊重(subscribe
和 unsubscribe
),并且它取决于这些操作的父 websocket 的状态要成功。
这是三个嵌套生命周期可观察对象的理想行为的时间表,其中 C 依赖于 B,B 又依赖于 A:
A = someInput.switchMap((i) => LifecycleObservable())
B = A.switchMap((a) => LifecycleObservable())
C = B.switchMap((b) => LifecycleObservable())
C.listen(print);
// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced
// <-- c is unsubscribed
teardown C
teardown B
teardown A
// <-- C is re-subscribed-to
setup A
setup B
setup C
// <-- produce [someInput]
teardown C
teardown B
teardown A
setup A
setup B
setup C
// <-- c is produced
第一个问题:这是反模式吗?我在网上找不到太多关于这种模式的信息,但这似乎是一种非常标准的东西,你会 运行 进入 observables:一些对象只有一个生命周期,而一些对象可能想要就靠那个了。
我可以使用类似这样的东西非常接近这种理想行为:
class LifecycleObservable {
static Observable<T> fromObservable<T>({
@required Observable<T> input,
@required Future<void> Function(T) setup,
@required Future<void> Function(T) teardown,
}) {
return input.asyncMap((T _input) async {
await setup(_input);
return _input;
}).switchMap((T _input) {
return Observable<T>(Observable.never()) //
.startWith(_input)
.doOnCancel(() async {
await teardown(_input);
});
});
}
}
此代码接受有状态对象流,运行在它们生成时 setup
在它们上 teardown
作为 [=20 中的子可观察对象 teardown
=]被取消。
当在最初理想化的时间轴中产生第二个 [someInput]
时出现问题:使用上面的代码我得到了一个像
这样的调用图
// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced
// <-- produce [someInput]
teardown A
setup A
teardown B
setup B
teardown C
setup C
// <-- c is produced
问题在于,如果 B 依赖于 A(例如从依赖于开放 websocket 传输的订阅调用 unsubscribe
),此拆卸顺序会破坏每个对象的预期生命周期(订阅尝试发送unsubscribe
通过封闭的 websocket 传输。
在我看来,很简单,可观察模式无法表达这些语义。具体来说,可观察模式不是为级联依赖设计的——父可观察对象对其子可观察对象的状态一无所知。
我用下面的 dart 代码自己解决了这个问题。我敢肯定这很糟糕,但总的来说它似乎对我有用™。
class WithLifecycle<T> {
final FutureOr<void> Function() setup;
final FutureOr<void> Function() teardown;
final T value;
final WithLifecycle parent;
List<WithLifecycle> _children = [];
bool _disposed = false;
WithLifecycle({
@required this.value,
this.setup,
this.teardown,
this.parent,
});
void addDependency(WithLifecycle child) => _children.add(child);
void removeDependency(WithLifecycle child) => _children.remove(child);
Future<void> init() async {
parent?.addDependency(this);
await setup();
}
Future<void> dispose() async {
if (_disposed) {
return;
}
_disposed = true;
for (var _child in _children) {
await _child.dispose();
}
_children.clear();
await teardown();
}
}
然后在使用可观察对象时用于创建必要的依赖链:
class LifecycleObservable {
static Observable<WithLifecycle<T>> fromObservable<T>({
@required Observable<T> value,
WithLifecycle parent,
@required Future<void> Function(T) setup,
@required Future<void> Function(T) teardown,
}) {
return value.concatMap((T _value) {
final withLifecycle = WithLifecycle<T>(
value: _value,
parent: parent,
setup: () => setup(_value),
teardown: () => teardown(_value),
);
return Observable<WithLifecycle<T>>(Observable.never())
.startWith(withLifecycle)
.doOnListen(() async {
await withLifecycle.init();
}).doOnCancel(() async {
await withLifecycle.dispose();
});
});
}
}
像
一样使用
token$ = PublishSubject();
channel$ = token$.switchMap((token) {
return LifecycleObservable.fromObservable<IOWebSocketChannel>(
value: Observable.just(IOWebSocketChannel.connect(Constants.connectionString)),
setup: (channel) async {
print("setup A ${channel.hashCode}");
},
teardown: (channel) async {
print("teardown A ${channel.hashCode}");
await channel.sink.close(status.goingAway);
});
});
streams$ = channel$.switchMap((channel) {
return LifecycleObservable.fromObservable<Stream<String>>(
parent: channel,
value: Observable.just(channel.value.stream.cast<String>()),
setup: (thing) async {
print("setup B ${thing.hashCode}");
},
teardown: (thing) async {
print("teardown B ${thing.hashCode}");
},
);
});
messages = streams$.flatMap((i) => i.value).share();
并以如下所示的调用图结束
// <- push [token]
flutter: setup A 253354366
flutter: setup B 422603720
// <- push [token]
flutter: teardown B 422603720
flutter: teardown A 253354366
flutter: setup A 260164938
flutter: setup B 161253018
一般的设计问题可以描述为:
我有一个 websocket 连接,它有一个严格的生命周期要遵守——它希望 connect
和 disconnect
被适当地调用,而且,因为它与系统对话,它使用 .在这个 websocket 连接中,我们有多个不同的订阅对象,每个对象都有一个严格的生命周期,它希望得到尊重(subscribe
和 unsubscribe
),并且它取决于这些操作的父 websocket 的状态要成功。
这是三个嵌套生命周期可观察对象的理想行为的时间表,其中 C 依赖于 B,B 又依赖于 A:
A = someInput.switchMap((i) => LifecycleObservable())
B = A.switchMap((a) => LifecycleObservable())
C = B.switchMap((b) => LifecycleObservable())
C.listen(print);
// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced
// <-- c is unsubscribed
teardown C
teardown B
teardown A
// <-- C is re-subscribed-to
setup A
setup B
setup C
// <-- produce [someInput]
teardown C
teardown B
teardown A
setup A
setup B
setup C
// <-- c is produced
第一个问题:这是反模式吗?我在网上找不到太多关于这种模式的信息,但这似乎是一种非常标准的东西,你会 运行 进入 observables:一些对象只有一个生命周期,而一些对象可能想要就靠那个了。
我可以使用类似这样的东西非常接近这种理想行为:
class LifecycleObservable {
static Observable<T> fromObservable<T>({
@required Observable<T> input,
@required Future<void> Function(T) setup,
@required Future<void> Function(T) teardown,
}) {
return input.asyncMap((T _input) async {
await setup(_input);
return _input;
}).switchMap((T _input) {
return Observable<T>(Observable.never()) //
.startWith(_input)
.doOnCancel(() async {
await teardown(_input);
});
});
}
}
此代码接受有状态对象流,运行在它们生成时 setup
在它们上 teardown
作为 [=20 中的子可观察对象 teardown
=]被取消。
当在最初理想化的时间轴中产生第二个 [someInput]
时出现问题:使用上面的代码我得到了一个像
// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced
// <-- produce [someInput]
teardown A
setup A
teardown B
setup B
teardown C
setup C
// <-- c is produced
问题在于,如果 B 依赖于 A(例如从依赖于开放 websocket 传输的订阅调用 unsubscribe
),此拆卸顺序会破坏每个对象的预期生命周期(订阅尝试发送unsubscribe
通过封闭的 websocket 传输。
在我看来,很简单,可观察模式无法表达这些语义。具体来说,可观察模式不是为级联依赖设计的——父可观察对象对其子可观察对象的状态一无所知。
我用下面的 dart 代码自己解决了这个问题。我敢肯定这很糟糕,但总的来说它似乎对我有用™。
class WithLifecycle<T> {
final FutureOr<void> Function() setup;
final FutureOr<void> Function() teardown;
final T value;
final WithLifecycle parent;
List<WithLifecycle> _children = [];
bool _disposed = false;
WithLifecycle({
@required this.value,
this.setup,
this.teardown,
this.parent,
});
void addDependency(WithLifecycle child) => _children.add(child);
void removeDependency(WithLifecycle child) => _children.remove(child);
Future<void> init() async {
parent?.addDependency(this);
await setup();
}
Future<void> dispose() async {
if (_disposed) {
return;
}
_disposed = true;
for (var _child in _children) {
await _child.dispose();
}
_children.clear();
await teardown();
}
}
然后在使用可观察对象时用于创建必要的依赖链:
class LifecycleObservable {
static Observable<WithLifecycle<T>> fromObservable<T>({
@required Observable<T> value,
WithLifecycle parent,
@required Future<void> Function(T) setup,
@required Future<void> Function(T) teardown,
}) {
return value.concatMap((T _value) {
final withLifecycle = WithLifecycle<T>(
value: _value,
parent: parent,
setup: () => setup(_value),
teardown: () => teardown(_value),
);
return Observable<WithLifecycle<T>>(Observable.never())
.startWith(withLifecycle)
.doOnListen(() async {
await withLifecycle.init();
}).doOnCancel(() async {
await withLifecycle.dispose();
});
});
}
}
像
一样使用token$ = PublishSubject();
channel$ = token$.switchMap((token) {
return LifecycleObservable.fromObservable<IOWebSocketChannel>(
value: Observable.just(IOWebSocketChannel.connect(Constants.connectionString)),
setup: (channel) async {
print("setup A ${channel.hashCode}");
},
teardown: (channel) async {
print("teardown A ${channel.hashCode}");
await channel.sink.close(status.goingAway);
});
});
streams$ = channel$.switchMap((channel) {
return LifecycleObservable.fromObservable<Stream<String>>(
parent: channel,
value: Observable.just(channel.value.stream.cast<String>()),
setup: (thing) async {
print("setup B ${thing.hashCode}");
},
teardown: (thing) async {
print("teardown B ${thing.hashCode}");
},
);
});
messages = streams$.flatMap((i) => i.value).share();
并以如下所示的调用图结束
// <- push [token]
flutter: setup A 253354366
flutter: setup B 422603720
// <- push [token]
flutter: teardown B 422603720
flutter: teardown A 253354366
flutter: setup A 260164938
flutter: setup B 161253018