重放值或错误的惰性可观察序列

Lazy observable sequence that replays value or error

我正在尝试创建具有以下特征的可观察管道:

我这辈子都想不出正确的语义来完成这个。我认为做这样的事情会很简单:

Observable
    .Defer(() => Observable
        .Start(() => { /* do something */ })
        .PublishLast()
        .ConnectUntilCompleted());

其中 ConnectUntilCompleted 就像它听起来的那样:

public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
    @this.Connect();
    return @this;
}

这似乎在 observable 成功终止时有效,但在出现错误时无效。任何订阅者都没有收到错误:

[Fact]
public void test()
{
    var o = Observable
        .Defer(() => Observable
            .Start(() => { throw new InvalidOperationException(); })
            .PublishLast()
            .ConnectUntilCompleted());

    // this does not throw!
    o.Subscribe();
}

谁能告诉我我做错了什么?为什么 Publish 不重播它收到的任何错误?

更新:它变得更加陌生:

[Fact]
public void test()
{
    var o = Observable
        .Defer(() => Observable
            .Start(() => { throw new InvalidOperationException(); })
            .PublishLast()
            .ConnectUntilCompleted())
        .Do(
            _ => { },
            ex => { /* this executes */ });

    // this does not throw!
    o.Subscribe();

    o.Subscribe(
        _ => { },
        ex => { /* even though this executes */ });
}

试试你这个版本的ConnectUntilCompleted方法:

public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
    return Observable.Create<T>(o =>
    {
        var subscription = @this.Subscribe(o);
        var connection = @this.Connect();
        return new CompositeDisposable(subscription, connection);
    });
}

允许 Rx 正常运行。

现在我已经添加到它以帮助显示发生了什么:

public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
    return Observable.Create<T>(o =>
    {
        var disposed = Disposable.Create(() => Console.WriteLine("Disposed!"));
        var subscription = Observable
            .Defer<T>(() => { Console.WriteLine("Subscribing!"); return @this; })
            .Subscribe(o);
        Console.WriteLine("Connecting!");
        var connection = @this.Connect();
        return new CompositeDisposable(disposed, subscription, connection);
    });
}

现在您的 observable 看起来像这样:

var o =
    Observable
        .Defer(() =>
            Observable
                .Start(() =>
                {
                    Console.WriteLine("Started.");
                    throw new InvalidOperationException();
                }))
        .PublishLast()
        .ConnectUntilCompleted();

最后的关键是实际处理订阅中的错误 - 所以仅仅做到 o.Subscribe().

是不够的

所以这样做:

        o.Subscribe(
            x => Console.WriteLine(x),
            e => Console.WriteLine(e.Message),
            () =>  Console.WriteLine("Done."));

        o.Subscribe(
            x => Console.WriteLine(x),
            e => Console.WriteLine(e.Message),
            () =>  Console.WriteLine("Done."));

        o.Subscribe(
            x => Console.WriteLine(x),
            e => Console.WriteLine(e.Message),
            () =>  Console.WriteLine("Done."));         

当我 运行 我得到这个:

Subscribing!
Connecting!
Subscribing!
Connecting!
Subscribing!
Connecting!
Started.
Operation is not valid due to the current state of the object.
Disposed!
Operation is not valid due to the current state of the object.
Disposed!
Operation is not valid due to the current state of the object.
Disposed!

注意"Started"只出现一次,但是报错3次

(有时 Started 在第一次订阅后会出现在列表中的较高位置。)

我认为这就是您想要的描述。

实现您的要求的另一种方法是:

var lazy = new Lazy<Task>(async () => { /* execute once */ }, isThreadSafe: true);
var o = Observable.FromAsync(() => lazy.Value);

第一次订阅时,lazy 会创建(并执行)任务。对于其他订阅,lazy 将 return 相同(可能已经完成或失败)的任务。

为了支持@Engimativity 的回答,我想展示您应该如何运行进行测试,这样您就不会再得到这些"surprises"。您的测试是不确定的,因为它们是 multi-threaded/concurrent。您在不提供 IScheduler 的情况下使用 Observable.Start 是有问题的。如果您 运行 使用 TestScheduler 进行测试,您的测试现在将是单线程和确定性的

[Test]
public void Test()
{
    var testScheduler = new TestScheduler();
    var o = Observable
        .Defer(() => Observable
            .Start(() => { throw new InvalidOperationException(); }, testScheduler)
            .PublishLast()
            .ConnectUntilCompleted());

    var observer = testScheduler.CreateObserver<Unit>();
    o.Subscribe(observer);

    testScheduler.Start();

    CollectionAssert.IsNotEmpty(observer.Messages);
    Assert.AreEqual(NotificationKind.OnError, observer.Messages[0].Value.Kind);
}