Prepend Task to IObservable<T> (Finally 对称)

Prepend Task to IObservable<T> (symmetry of Finally)

我想在第一个客户端连接时向我的服务器发送一个开始脉冲,并在最后一个客户端断开连接时向我的服务器发送一个结束脉冲。

public class MyAdapter : IObservable<MyType> {

    IObservable<MyType> MyObservable = BuildMyObservable()
        .Initially(Start) // <- this method doesn't exist
        .Finally(Stop).Publish().RefCount().Subscribe(observer);

    public IDisposable Subscribe(IObserver<MyType> observer);
        return MyObservable.Subscribe(observer)
    }

    async Task Start() { /* start UDP stream */ }
    async Task Stop() { /* stop UDP stream */ }
    IObservable<MyType> BuildMyObservable() { /* wire up stuff */ }

}

在上面的方法中,我是在寻找一个不存在的函数Initially,还是我只是忽略了它?

我猜您正在寻找 RxJava 的 doOnSubscribe 的 .Net 等价物。开箱即用。

您可以做的是将 MyObservable 包装在 Observable.Defer 函数中,然后在 Defer 中调用您的服务器。你可以玩下面的代码看看我的意思:

class Program
{
    static void Main(string[] args)
    {
        var source = Observable.Interval(TimeSpan.FromSeconds(1));

        var published = Observable.Defer(() =>
        {
            Console.WriteLine("Start"); // Here, you post "Start" to server
            return source;
        })
        .Finally(() => Console.WriteLine("End")) // Here, you post "End"
        .Publish()
        .RefCount();

        Console.ReadLine();
        var disposable = published.Subscribe(x => Console.WriteLine("First " + x));
        Console.ReadLine();
        var disposable2 = published.Subscribe(x => Console.WriteLine("Second " + x));
        Console.ReadLine();
        disposable.Dispose();
        Console.ReadLine();
        disposable2.Dispose();
        Console.ReadLine();
        published.Subscribe(x => Console.WriteLine("Third " + x));
        Console.ReadLine();
    }
}

有关延迟的更多说明,请参阅 this excellent blogpost

使用 pmbanka 的正确答案,我为此添加了一个 Observable 扩展。

public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Func<Task> initialAction) {
    return Observable.Defer(async () =>
    {
        await initialAction();
        return resultingObservable;
    });
}

public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Action initialAction) {
    return Observable.Defer(() =>
    {
        Action();
        return resultingObservable;
    });
}

它尚未经过测试,对于 async/task lambda 可能不是最佳选择。

我想下面的也是一样的:

public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Func<Task> initialAction) {
    return Observable.Create(async (IObserver<T> observer) =>
    {
        await initialAction();
        return resultingObservable.Subscribe(observer);
    });
}

您可以像对待任何其他序列一样简单地对待任务,并将其链接到查询中

Start().ToObservable()
  .SelectMany(_=>MyObservable)
  .Finally(Stop)

作为单独的注释,我鼓励您避免制作具有采用格式 IDisposable Subscribe(IObserver<MyType> observer) 的方法的 API。这会从您的消费者那里夺走 Rx 的力量。相反,只需公开 IObservable<T>,因为它已经具有 Subscribe 方法。现在您的消费者可以链接您的序列,组合它,选择正确的 concurrency/threading 模型(使用 ObserveOn/SubscribeOn),并应用他们的错误处理要求。

另外最后一点,对作为方法调用结果的序列进行 publish-refcount 有点奇怪。当你的方法只允许消费者提供一个消费者时,发布引用计数就更奇怪了。假设您将方法签名更改为 recommended/standard 方法,那么我还建议您删除 Publish().Refcount() 代码,因为消费者极不可能缓存结果并重用它,v.s.回忆方法。或者您可以保留该方法(最好将其更改为 属性),然后在内部缓存已发布的序列。

public class MyServiceThing
{
    private readonly IObservable<MyType> _myObservable;

    public MyServiceThing()
    {
        _myObservable = Start().ToObservable()
            .SelectMany(_=>/*The thing that defines your observable sequence...*/)
            .Finally(Stop)
            .Publish().RefCount();
    }

    public IObservable<MyType> MyObservable()
    {
        return _myObservable;
    }
    //OR
    //public IObservable<MyType> MyObservable() { get { return _myObservable; } }

    private async Task Start() {}
    private async Task Stop() {}
}