重放值或错误的惰性可观察序列
Lazy observable sequence that replays value or error
我正在尝试创建具有以下特征的可观察管道:
- 懒惰(在有人订阅之前什么都不做)
- 无论收到多少订阅,最多执行一次
- 重播其结果值(如果有)或
- 重播其产生的错误,如果有的话
我这辈子都想不出正确的语义来完成这个。我认为做这样的事情会很简单:
Observable
.Defer(() => Observable
.Start(() => { /* do something */ })
.PublishLast()
.ConnectUntilCompleted());
其中 ConnectUntilCompleted
就像它听起来的那样:
public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
@this.Connect();
return @this;
}
这似乎在 observable 成功终止时有效,但在出现错误时无效。任何订阅者都没有收到错误:
[Fact]
public void test()
{
var o = Observable
.Defer(() => Observable
.Start(() => { throw new InvalidOperationException(); })
.PublishLast()
.ConnectUntilCompleted());
// this does not throw!
o.Subscribe();
}
谁能告诉我我做错了什么?为什么 Publish
不重播它收到的任何错误?
更新:它变得更加陌生:
[Fact]
public void test()
{
var o = Observable
.Defer(() => Observable
.Start(() => { throw new InvalidOperationException(); })
.PublishLast()
.ConnectUntilCompleted())
.Do(
_ => { },
ex => { /* this executes */ });
// this does not throw!
o.Subscribe();
o.Subscribe(
_ => { },
ex => { /* even though this executes */ });
}
试试你这个版本的ConnectUntilCompleted
方法:
public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
return Observable.Create<T>(o =>
{
var subscription = @this.Subscribe(o);
var connection = @this.Connect();
return new CompositeDisposable(subscription, connection);
});
}
允许 Rx 正常运行。
现在我已经添加到它以帮助显示发生了什么:
public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
return Observable.Create<T>(o =>
{
var disposed = Disposable.Create(() => Console.WriteLine("Disposed!"));
var subscription = Observable
.Defer<T>(() => { Console.WriteLine("Subscribing!"); return @this; })
.Subscribe(o);
Console.WriteLine("Connecting!");
var connection = @this.Connect();
return new CompositeDisposable(disposed, subscription, connection);
});
}
现在您的 observable 看起来像这样:
var o =
Observable
.Defer(() =>
Observable
.Start(() =>
{
Console.WriteLine("Started.");
throw new InvalidOperationException();
}))
.PublishLast()
.ConnectUntilCompleted();
最后的关键是实际处理订阅中的错误 - 所以仅仅做到 o.Subscribe()
.
是不够的
所以这样做:
o.Subscribe(
x => Console.WriteLine(x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done."));
o.Subscribe(
x => Console.WriteLine(x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done."));
o.Subscribe(
x => Console.WriteLine(x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done."));
当我 运行 我得到这个:
Subscribing!
Connecting!
Subscribing!
Connecting!
Subscribing!
Connecting!
Started.
Operation is not valid due to the current state of the object.
Disposed!
Operation is not valid due to the current state of the object.
Disposed!
Operation is not valid due to the current state of the object.
Disposed!
注意"Started"只出现一次,但是报错3次
(有时 Started
在第一次订阅后会出现在列表中的较高位置。)
我认为这就是您想要的描述。
实现您的要求的另一种方法是:
var lazy = new Lazy<Task>(async () => { /* execute once */ }, isThreadSafe: true);
var o = Observable.FromAsync(() => lazy.Value);
第一次订阅时,lazy
会创建(并执行)任务。对于其他订阅,lazy
将 return 相同(可能已经完成或失败)的任务。
为了支持@Engimativity 的回答,我想展示您应该如何运行进行测试,这样您就不会再得到这些"surprises"。您的测试是不确定的,因为它们是 multi-threaded/concurrent。您在不提供 IScheduler
的情况下使用 Observable.Start
是有问题的。如果您 运行 使用 TestScheduler
进行测试,您的测试现在将是单线程和确定性的
[Test]
public void Test()
{
var testScheduler = new TestScheduler();
var o = Observable
.Defer(() => Observable
.Start(() => { throw new InvalidOperationException(); }, testScheduler)
.PublishLast()
.ConnectUntilCompleted());
var observer = testScheduler.CreateObserver<Unit>();
o.Subscribe(observer);
testScheduler.Start();
CollectionAssert.IsNotEmpty(observer.Messages);
Assert.AreEqual(NotificationKind.OnError, observer.Messages[0].Value.Kind);
}
我正在尝试创建具有以下特征的可观察管道:
- 懒惰(在有人订阅之前什么都不做)
- 无论收到多少订阅,最多执行一次
- 重播其结果值(如果有)或
- 重播其产生的错误,如果有的话
我这辈子都想不出正确的语义来完成这个。我认为做这样的事情会很简单:
Observable
.Defer(() => Observable
.Start(() => { /* do something */ })
.PublishLast()
.ConnectUntilCompleted());
其中 ConnectUntilCompleted
就像它听起来的那样:
public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
@this.Connect();
return @this;
}
这似乎在 observable 成功终止时有效,但在出现错误时无效。任何订阅者都没有收到错误:
[Fact]
public void test()
{
var o = Observable
.Defer(() => Observable
.Start(() => { throw new InvalidOperationException(); })
.PublishLast()
.ConnectUntilCompleted());
// this does not throw!
o.Subscribe();
}
谁能告诉我我做错了什么?为什么 Publish
不重播它收到的任何错误?
更新:它变得更加陌生:
[Fact]
public void test()
{
var o = Observable
.Defer(() => Observable
.Start(() => { throw new InvalidOperationException(); })
.PublishLast()
.ConnectUntilCompleted())
.Do(
_ => { },
ex => { /* this executes */ });
// this does not throw!
o.Subscribe();
o.Subscribe(
_ => { },
ex => { /* even though this executes */ });
}
试试你这个版本的ConnectUntilCompleted
方法:
public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
return Observable.Create<T>(o =>
{
var subscription = @this.Subscribe(o);
var connection = @this.Connect();
return new CompositeDisposable(subscription, connection);
});
}
允许 Rx 正常运行。
现在我已经添加到它以帮助显示发生了什么:
public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
return Observable.Create<T>(o =>
{
var disposed = Disposable.Create(() => Console.WriteLine("Disposed!"));
var subscription = Observable
.Defer<T>(() => { Console.WriteLine("Subscribing!"); return @this; })
.Subscribe(o);
Console.WriteLine("Connecting!");
var connection = @this.Connect();
return new CompositeDisposable(disposed, subscription, connection);
});
}
现在您的 observable 看起来像这样:
var o =
Observable
.Defer(() =>
Observable
.Start(() =>
{
Console.WriteLine("Started.");
throw new InvalidOperationException();
}))
.PublishLast()
.ConnectUntilCompleted();
最后的关键是实际处理订阅中的错误 - 所以仅仅做到 o.Subscribe()
.
所以这样做:
o.Subscribe(
x => Console.WriteLine(x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done."));
o.Subscribe(
x => Console.WriteLine(x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done."));
o.Subscribe(
x => Console.WriteLine(x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done."));
当我 运行 我得到这个:
Subscribing! Connecting! Subscribing! Connecting! Subscribing! Connecting! Started. Operation is not valid due to the current state of the object. Disposed! Operation is not valid due to the current state of the object. Disposed! Operation is not valid due to the current state of the object. Disposed!
注意"Started"只出现一次,但是报错3次
(有时 Started
在第一次订阅后会出现在列表中的较高位置。)
我认为这就是您想要的描述。
实现您的要求的另一种方法是:
var lazy = new Lazy<Task>(async () => { /* execute once */ }, isThreadSafe: true);
var o = Observable.FromAsync(() => lazy.Value);
第一次订阅时,lazy
会创建(并执行)任务。对于其他订阅,lazy
将 return 相同(可能已经完成或失败)的任务。
为了支持@Engimativity 的回答,我想展示您应该如何运行进行测试,这样您就不会再得到这些"surprises"。您的测试是不确定的,因为它们是 multi-threaded/concurrent。您在不提供 IScheduler
的情况下使用 Observable.Start
是有问题的。如果您 运行 使用 TestScheduler
进行测试,您的测试现在将是单线程和确定性的
[Test]
public void Test()
{
var testScheduler = new TestScheduler();
var o = Observable
.Defer(() => Observable
.Start(() => { throw new InvalidOperationException(); }, testScheduler)
.PublishLast()
.ConnectUntilCompleted());
var observer = testScheduler.CreateObserver<Unit>();
o.Subscribe(observer);
testScheduler.Start();
CollectionAssert.IsNotEmpty(observer.Messages);
Assert.AreEqual(NotificationKind.OnError, observer.Messages[0].Value.Kind);
}