Prepend Task to IObservable<T> (Finally 对称)
Prepend Task to IObservable<T> (symmetry of Finally)
我想在第一个客户端连接时向我的服务器发送一个开始脉冲,并在最后一个客户端断开连接时向我的服务器发送一个结束脉冲。
public class MyAdapter : IObservable<MyType> {
IObservable<MyType> MyObservable = BuildMyObservable()
.Initially(Start) // <- this method doesn't exist
.Finally(Stop).Publish().RefCount().Subscribe(observer);
public IDisposable Subscribe(IObserver<MyType> observer);
return MyObservable.Subscribe(observer)
}
async Task Start() { /* start UDP stream */ }
async Task Stop() { /* stop UDP stream */ }
IObservable<MyType> BuildMyObservable() { /* wire up stuff */ }
}
在上面的方法中,我是在寻找一个不存在的函数Initially
,还是我只是忽略了它?
我猜您正在寻找 RxJava 的 doOnSubscribe 的 .Net 等价物。开箱即用。
您可以做的是将 MyObservable
包装在 Observable.Defer
函数中,然后在 Defer
中调用您的服务器。你可以玩下面的代码看看我的意思:
class Program
{
static void Main(string[] args)
{
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var published = Observable.Defer(() =>
{
Console.WriteLine("Start"); // Here, you post "Start" to server
return source;
})
.Finally(() => Console.WriteLine("End")) // Here, you post "End"
.Publish()
.RefCount();
Console.ReadLine();
var disposable = published.Subscribe(x => Console.WriteLine("First " + x));
Console.ReadLine();
var disposable2 = published.Subscribe(x => Console.WriteLine("Second " + x));
Console.ReadLine();
disposable.Dispose();
Console.ReadLine();
disposable2.Dispose();
Console.ReadLine();
published.Subscribe(x => Console.WriteLine("Third " + x));
Console.ReadLine();
}
}
有关延迟的更多说明,请参阅 this excellent blogpost。
使用 pmbanka 的正确答案,我为此添加了一个 Observable 扩展。
public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Func<Task> initialAction) {
return Observable.Defer(async () =>
{
await initialAction();
return resultingObservable;
});
}
public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Action initialAction) {
return Observable.Defer(() =>
{
Action();
return resultingObservable;
});
}
它尚未经过测试,对于 async/task lambda 可能不是最佳选择。
我想下面的也是一样的:
public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Func<Task> initialAction) {
return Observable.Create(async (IObserver<T> observer) =>
{
await initialAction();
return resultingObservable.Subscribe(observer);
});
}
您可以像对待任何其他序列一样简单地对待任务,并将其链接到查询中
Start().ToObservable()
.SelectMany(_=>MyObservable)
.Finally(Stop)
作为单独的注释,我鼓励您避免制作具有采用格式 IDisposable Subscribe(IObserver<MyType> observer)
的方法的 API。这会从您的消费者那里夺走 Rx 的力量。相反,只需公开 IObservable<T>
,因为它已经具有 Subscribe
方法。现在您的消费者可以链接您的序列,组合它,选择正确的 concurrency/threading 模型(使用 ObserveOn
/SubscribeOn
),并应用他们的错误处理要求。
另外最后一点,对作为方法调用结果的序列进行 publish-refcount 有点奇怪。当你的方法只允许消费者提供一个消费者时,发布引用计数就更奇怪了。假设您将方法签名更改为 recommended/standard 方法,那么我还建议您删除 Publish().Refcount()
代码,因为消费者极不可能缓存结果并重用它,v.s.回忆方法。或者您可以保留该方法(最好将其更改为 属性),然后在内部缓存已发布的序列。
public class MyServiceThing
{
private readonly IObservable<MyType> _myObservable;
public MyServiceThing()
{
_myObservable = Start().ToObservable()
.SelectMany(_=>/*The thing that defines your observable sequence...*/)
.Finally(Stop)
.Publish().RefCount();
}
public IObservable<MyType> MyObservable()
{
return _myObservable;
}
//OR
//public IObservable<MyType> MyObservable() { get { return _myObservable; } }
private async Task Start() {}
private async Task Stop() {}
}
我想在第一个客户端连接时向我的服务器发送一个开始脉冲,并在最后一个客户端断开连接时向我的服务器发送一个结束脉冲。
public class MyAdapter : IObservable<MyType> {
IObservable<MyType> MyObservable = BuildMyObservable()
.Initially(Start) // <- this method doesn't exist
.Finally(Stop).Publish().RefCount().Subscribe(observer);
public IDisposable Subscribe(IObserver<MyType> observer);
return MyObservable.Subscribe(observer)
}
async Task Start() { /* start UDP stream */ }
async Task Stop() { /* stop UDP stream */ }
IObservable<MyType> BuildMyObservable() { /* wire up stuff */ }
}
在上面的方法中,我是在寻找一个不存在的函数Initially
,还是我只是忽略了它?
我猜您正在寻找 RxJava 的 doOnSubscribe 的 .Net 等价物。开箱即用。
您可以做的是将 MyObservable
包装在 Observable.Defer
函数中,然后在 Defer
中调用您的服务器。你可以玩下面的代码看看我的意思:
class Program
{
static void Main(string[] args)
{
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var published = Observable.Defer(() =>
{
Console.WriteLine("Start"); // Here, you post "Start" to server
return source;
})
.Finally(() => Console.WriteLine("End")) // Here, you post "End"
.Publish()
.RefCount();
Console.ReadLine();
var disposable = published.Subscribe(x => Console.WriteLine("First " + x));
Console.ReadLine();
var disposable2 = published.Subscribe(x => Console.WriteLine("Second " + x));
Console.ReadLine();
disposable.Dispose();
Console.ReadLine();
disposable2.Dispose();
Console.ReadLine();
published.Subscribe(x => Console.WriteLine("Third " + x));
Console.ReadLine();
}
}
有关延迟的更多说明,请参阅 this excellent blogpost。
使用 pmbanka 的正确答案,我为此添加了一个 Observable 扩展。
public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Func<Task> initialAction) {
return Observable.Defer(async () =>
{
await initialAction();
return resultingObservable;
});
}
public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Action initialAction) {
return Observable.Defer(() =>
{
Action();
return resultingObservable;
});
}
它尚未经过测试,对于 async/task lambda 可能不是最佳选择。
我想下面的也是一样的:
public static IObservable<T> Initially<T>(this IObservable<T> resultingObservable, Func<Task> initialAction) {
return Observable.Create(async (IObserver<T> observer) =>
{
await initialAction();
return resultingObservable.Subscribe(observer);
});
}
您可以像对待任何其他序列一样简单地对待任务,并将其链接到查询中
Start().ToObservable()
.SelectMany(_=>MyObservable)
.Finally(Stop)
作为单独的注释,我鼓励您避免制作具有采用格式 IDisposable Subscribe(IObserver<MyType> observer)
的方法的 API。这会从您的消费者那里夺走 Rx 的力量。相反,只需公开 IObservable<T>
,因为它已经具有 Subscribe
方法。现在您的消费者可以链接您的序列,组合它,选择正确的 concurrency/threading 模型(使用 ObserveOn
/SubscribeOn
),并应用他们的错误处理要求。
另外最后一点,对作为方法调用结果的序列进行 publish-refcount 有点奇怪。当你的方法只允许消费者提供一个消费者时,发布引用计数就更奇怪了。假设您将方法签名更改为 recommended/standard 方法,那么我还建议您删除 Publish().Refcount()
代码,因为消费者极不可能缓存结果并重用它,v.s.回忆方法。或者您可以保留该方法(最好将其更改为 属性),然后在内部缓存已发布的序列。
public class MyServiceThing
{
private readonly IObservable<MyType> _myObservable;
public MyServiceThing()
{
_myObservable = Start().ToObservable()
.SelectMany(_=>/*The thing that defines your observable sequence...*/)
.Finally(Stop)
.Publish().RefCount();
}
public IObservable<MyType> MyObservable()
{
return _myObservable;
}
//OR
//public IObservable<MyType> MyObservable() { get { return _myObservable; } }
private async Task Start() {}
private async Task Stop() {}
}