如何对第一个订阅创建和最后一个订阅处理引入副作用?
How do I introduce a side effect on the first subscription creation and last subscription disposal?
我有一个 class 必须执行 setup/teardown 才能生成包装为可观察对象的事件。为简洁起见,我在下面的代码中使用 Observable.Interval 生成一个可观察流。我需要在此代码中进行哪些更改才能使 Start 和 Stop 被调用两次?实际上,每次引用计数递增到 1 时都需要调用 Start,而每次引用计数递减到 0 时都需要调用 Stop。
static async Task Main(string[] args)
{
var provider = new SomeProvider();
var s1 = provider.Subscribe(on1);
var s2 = provider.Subscribe(on2);
await Task.Delay(5000);
s1.Dispose();
s2.Dispose();
var s3 = provider.Subscribe(on1);
var s4 = provider.Subscribe(on2);
await Task.Delay(5000);
s3.Dispose();
s4.Dispose();
}
private static void on1(long obj) => Console.WriteLine("on1");
private static void on2(long obj) => Console.WriteLine("on2");
public class SomeProvider : IObservable<long>
{
//Example observable stream
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).Do(Start).Publish().RefCount();
public IDisposable Subscribe(IObserver<long> observer)
{
var subscription = timer.Subscribe(observer);
return new CompositeDisposable(subscription, Disposable.Create(() => { Stop(); }));
}
private static void Start(long obj) => Console.WriteLine("STARTED");
private void Stop() => Console.WriteLine("STOPPED");
}
任何时候你做source.Publish().RefCount()
,当引用计数变为1时将调用源的Subscribe
方法,当引用计数变为0时将处理订阅。你可以使用这个事实来实现你想要的行为。像这样的东西应该可以工作(下面未经测试的代码):
IObservable<T> MonitorSubscription<T>(IObservable<T> source, Action onSubscribe, Action onDispose)
{
//TODO: argument error checking/handling
return Observable.Create<T>(obs =>
{
onSubscribe();
var subscription = source.Subscribe(obs);
return Disposable.Create(() =>
{
subscription.Dispose();
onDispose();
});
});
}
您可以在具有“开始”和“停止”功能的地方调用此功能。
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).MonitorSubscription(Start, Stop).Publish().RefCount();
现在您将在计时器的引用计数变为 1 时调用 Start,在引用计数变为 0 时调用 Stop。
您可以像这样使用 Empty
、Finally
和 Concat
运算符:
readonly IObservable<long> timer = Observable
.Empty<long>()
.Finally(() => Console.WriteLine($"Started"))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)))
.Finally(() => Console.WriteLine($"Stopped"))
.Publish()
.RefCount();
首次订阅时打印"Started"
,最后退订时打印"Stopped"
。
或者您可以使用下面的扩展方法 OnSubscription
。它应该稍微更有效率,因为它避免了链接。
/// <summary>Invokes a specific action before every subscription.</summary>
public static IObservable<T> OnSubscription<T>(this IObservable<T> source, Action action)
=> Observable.Create<T>(o => { action(); return source.Subscribe(o); });
用法示例:
readonly IObservable<long> timer = Observable
.Interval(TimeSpan.FromSeconds(1))
.OnSubscription(() => Console.WriteLine($"Started"))
.Finally(() => Console.WriteLine($"Stopped"))
.Publish()
.RefCount();
借助前面的答案,我创建了这个扩展方法:
public static IObservable<T> OnSubscriptionDo<T>(this IObservable<T> source, Action subscribeAction, Action unsubscribeAction = null) =>
(null == unsubscribeAction) ?
source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Publish().RefCount() :
source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Finally(unsubscribeAction).Publish().RefCount();
“执行”是为了提供一些指示,表明此方法会引入副作用。然后可观察流定义如下:
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).OnSubscriptionDo(Start, Stop);
我有一个 class 必须执行 setup/teardown 才能生成包装为可观察对象的事件。为简洁起见,我在下面的代码中使用 Observable.Interval 生成一个可观察流。我需要在此代码中进行哪些更改才能使 Start 和 Stop 被调用两次?实际上,每次引用计数递增到 1 时都需要调用 Start,而每次引用计数递减到 0 时都需要调用 Stop。
static async Task Main(string[] args)
{
var provider = new SomeProvider();
var s1 = provider.Subscribe(on1);
var s2 = provider.Subscribe(on2);
await Task.Delay(5000);
s1.Dispose();
s2.Dispose();
var s3 = provider.Subscribe(on1);
var s4 = provider.Subscribe(on2);
await Task.Delay(5000);
s3.Dispose();
s4.Dispose();
}
private static void on1(long obj) => Console.WriteLine("on1");
private static void on2(long obj) => Console.WriteLine("on2");
public class SomeProvider : IObservable<long>
{
//Example observable stream
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).Do(Start).Publish().RefCount();
public IDisposable Subscribe(IObserver<long> observer)
{
var subscription = timer.Subscribe(observer);
return new CompositeDisposable(subscription, Disposable.Create(() => { Stop(); }));
}
private static void Start(long obj) => Console.WriteLine("STARTED");
private void Stop() => Console.WriteLine("STOPPED");
}
任何时候你做source.Publish().RefCount()
,当引用计数变为1时将调用源的Subscribe
方法,当引用计数变为0时将处理订阅。你可以使用这个事实来实现你想要的行为。像这样的东西应该可以工作(下面未经测试的代码):
IObservable<T> MonitorSubscription<T>(IObservable<T> source, Action onSubscribe, Action onDispose)
{
//TODO: argument error checking/handling
return Observable.Create<T>(obs =>
{
onSubscribe();
var subscription = source.Subscribe(obs);
return Disposable.Create(() =>
{
subscription.Dispose();
onDispose();
});
});
}
您可以在具有“开始”和“停止”功能的地方调用此功能。
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).MonitorSubscription(Start, Stop).Publish().RefCount();
现在您将在计时器的引用计数变为 1 时调用 Start,在引用计数变为 0 时调用 Stop。
您可以像这样使用 Empty
、Finally
和 Concat
运算符:
readonly IObservable<long> timer = Observable
.Empty<long>()
.Finally(() => Console.WriteLine($"Started"))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)))
.Finally(() => Console.WriteLine($"Stopped"))
.Publish()
.RefCount();
首次订阅时打印"Started"
,最后退订时打印"Stopped"
。
或者您可以使用下面的扩展方法 OnSubscription
。它应该稍微更有效率,因为它避免了链接。
/// <summary>Invokes a specific action before every subscription.</summary>
public static IObservable<T> OnSubscription<T>(this IObservable<T> source, Action action)
=> Observable.Create<T>(o => { action(); return source.Subscribe(o); });
用法示例:
readonly IObservable<long> timer = Observable
.Interval(TimeSpan.FromSeconds(1))
.OnSubscription(() => Console.WriteLine($"Started"))
.Finally(() => Console.WriteLine($"Stopped"))
.Publish()
.RefCount();
借助前面的答案,我创建了这个扩展方法:
public static IObservable<T> OnSubscriptionDo<T>(this IObservable<T> source, Action subscribeAction, Action unsubscribeAction = null) =>
(null == unsubscribeAction) ?
source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Publish().RefCount() :
source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Finally(unsubscribeAction).Publish().RefCount();
“执行”是为了提供一些指示,表明此方法会引入副作用。然后可观察流定义如下:
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).OnSubscriptionDo(Start, Stop);