获取所有订阅何时结束
Get when all subscriptions have ended
我正在创建多个订阅。
我需要检测它们(全部)何时完成才能执行操作。
private IList<IDisposable> subscriptions = new List<IDisposable>();
private void CreateSubscriptions(IEnumerable<int> integers)
{
if (this.subscriptions.Any())
foreach (IDisposable subscription in this.subscriptions)
subscription.Dispose();
this.subscriptions.Clear();
for (int i = 0; i < 50; i++)
{
this.subscriptions.Add(
Observable.Create<int>(
observer =>
{
foreach (int i in integers)
observer.OnNext(i);
observer.Completed();
return System.Reactive.Disposables.Disposable.Empty;
}
)
.SubscribeOn(System.Reactive.Concurrency.TaskPoolScheduler.Default)
.ObserveOn(this.fuasTileControl)
.Subscribe(
// action
);
}
}
如您所见,我正在创建 50 个可观察值,用于通知 IEnumerable<int>
的下一个值。这是一个东西示例代码。
我需要知道所有订阅何时结束(完成)才能执行操作。
有什么想法吗?
可能有更好的选择,但我知道的是使用 Merge
运算符。参见 http://reactivex.io/documentation/operators/merge.html。
然后您可以订阅这个 observable。如果您需要阻塞解决方案,可以改用 Wait()
。
我也对其他解决方案感兴趣。
void Main()
{
var sequence = CreateSubscriptions( new[] {1,5,6});
sequence.Subscribe((i) => {}, () => { Console.WriteLine("all ready"); });
//sequence.Wait();
//Console.WriteLine("all ready");
}
private IList<IDisposable> subscriptions = new List<IDisposable>();
private IObservable<int> CreateSubscriptions(IEnumerable<int> integers)
{
if (this.subscriptions.Any())
foreach (IDisposable subscription in this.subscriptions)
subscription.Dispose();
this.subscriptions.Clear();
var sequences = new List<IObservable<int>>();
for (int ix = 0; ix < 5; ix++)
{
var sequence = Observable.Create<int>(
observer =>
{
foreach (int i in integers)
observer.OnNext(i);
observer.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
}
);
sequences.Add(sequence);
this.subscriptions.Add(sequence.Subscribe(nr =>
{
Console.WriteLine(nr);
}));
}
return Observable.Merge(sequences);
}
我也做了一些小的修改以使其可以编译。
slightly-pedantic,但正确的答案是这是不可能的:订阅由 IDisposable
表示,它不提供任何外部方式来检测它何时终止(除了直接导致终止)。更准确地说,考虑到您在上面说明的用于填充 IList<IDisposable> subscriptions
的函数,当所有这些订阅都被处理时,没有机制可以 act/subscribe。
然而,有一种方法可以对一组可观察对象(由 IObservable
表示)以及何时完成。
给定 IEnumerable<IObservable> observables
或 IObservable<IObservable> observables
,以下将起作用:
observables
.Merge()
.Subscribe(
item => { /*onNext handler*/ },
e => { /*onError handler*/ },
() => { /* onCompleted handler */});
您可能希望将您想要的任何操作作为 onCompleted
处理程序。
我正在创建多个订阅。
我需要检测它们(全部)何时完成才能执行操作。
private IList<IDisposable> subscriptions = new List<IDisposable>();
private void CreateSubscriptions(IEnumerable<int> integers)
{
if (this.subscriptions.Any())
foreach (IDisposable subscription in this.subscriptions)
subscription.Dispose();
this.subscriptions.Clear();
for (int i = 0; i < 50; i++)
{
this.subscriptions.Add(
Observable.Create<int>(
observer =>
{
foreach (int i in integers)
observer.OnNext(i);
observer.Completed();
return System.Reactive.Disposables.Disposable.Empty;
}
)
.SubscribeOn(System.Reactive.Concurrency.TaskPoolScheduler.Default)
.ObserveOn(this.fuasTileControl)
.Subscribe(
// action
);
}
}
如您所见,我正在创建 50 个可观察值,用于通知 IEnumerable<int>
的下一个值。这是一个东西示例代码。
我需要知道所有订阅何时结束(完成)才能执行操作。
有什么想法吗?
可能有更好的选择,但我知道的是使用 Merge
运算符。参见 http://reactivex.io/documentation/operators/merge.html。
然后您可以订阅这个 observable。如果您需要阻塞解决方案,可以改用 Wait()
。
我也对其他解决方案感兴趣。
void Main()
{
var sequence = CreateSubscriptions( new[] {1,5,6});
sequence.Subscribe((i) => {}, () => { Console.WriteLine("all ready"); });
//sequence.Wait();
//Console.WriteLine("all ready");
}
private IList<IDisposable> subscriptions = new List<IDisposable>();
private IObservable<int> CreateSubscriptions(IEnumerable<int> integers)
{
if (this.subscriptions.Any())
foreach (IDisposable subscription in this.subscriptions)
subscription.Dispose();
this.subscriptions.Clear();
var sequences = new List<IObservable<int>>();
for (int ix = 0; ix < 5; ix++)
{
var sequence = Observable.Create<int>(
observer =>
{
foreach (int i in integers)
observer.OnNext(i);
observer.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
}
);
sequences.Add(sequence);
this.subscriptions.Add(sequence.Subscribe(nr =>
{
Console.WriteLine(nr);
}));
}
return Observable.Merge(sequences);
}
我也做了一些小的修改以使其可以编译。
slightly-pedantic,但正确的答案是这是不可能的:订阅由 IDisposable
表示,它不提供任何外部方式来检测它何时终止(除了直接导致终止)。更准确地说,考虑到您在上面说明的用于填充 IList<IDisposable> subscriptions
的函数,当所有这些订阅都被处理时,没有机制可以 act/subscribe。
然而,有一种方法可以对一组可观察对象(由 IObservable
表示)以及何时完成。
给定 IEnumerable<IObservable> observables
或 IObservable<IObservable> observables
,以下将起作用:
observables
.Merge()
.Subscribe(
item => { /*onNext handler*/ },
e => { /*onError handler*/ },
() => { /* onCompleted handler */});
您可能希望将您想要的任何操作作为 onCompleted
处理程序。