请求完成时通知可观察对象完成
Notify an observable to complete when request has finished
我对反应式编程还比较陌生,即使在阅读了 Rx.Net in Action 之后,我仍然在努力理解它。
我有一个服务 class,它有两个 public 方法 Get
和 GetStream
。
GetStream
将创建一个 observable,它将连接 store
和 update
observable,以便消费者毫无问题地接收未来的更新。
update
observable 每分钟从 providers
执行符号更新 - 轮询 api 请求
Get
将创建一个订阅来自 store
的项目的可观察对象,并与正在更新的 update
可观察对象连接 if。问题是更新完成后,没有 OnComplete
被调用并且消费者无限期挂起。我将这个 observable 用作某些地方的阻塞调用,而在其他地方用作非阻塞调用。
我想我可以摆脱 ResetEvent
但这似乎不是一种非常被动的处理方式。有更好的响应式替代方案吗?也许我正在尝试在应该 return 和 IEnumerable
?
的地方安装一个可观察的模式
目标是让一个可观察对象每隔指定的时间间隔更新一次存储。有一种方法可以获取商店的当前状态,包括当前更新。有另一种方法来获取商店的状态和任何未来的更新。
这是我目前所处位置的代码片段。
public class SymbolService : ISymbolService, IDisposable
{
private readonly ILogger _logger = Logger.Create(nameof(SymbolService));
private readonly IObservable<(string Provider, Symbol Symbol)> _update;
private readonly ISymbolStore _store;
private readonly IDisposable _token;
private bool _updating = true;
public SymbolService(SymbolOptions options, ISymbolStore store,
IEnumerable<ISymbolProvider> providers)
{
_store = store;
// Request and aggregate symbols from providers and put
// into store.
var source = providers
.Select(provider => provider
.GetSymbols()
.Select(symbol => (provider.Name, symbol))
.Take(10))
.Merge()
.Do(entry =>
{
_updating = true;
_store.Put(entry);
},
() =>
_updating = false)
.Log(_logger, "Symbol");
// Perform update of symbols every interval
var update = Observable
.Timer(TimeSpan.Zero, options.RefreshInterval)
.SelectMany(_ => source)
.Publish();
_update = update;
_token = update
.Connect();
}
/// <summary>
/// Gets all symbols for a given provider.
/// </summary>
/// <param name="provider">The specified provider.</param>
/// <returns>The observable stream.</returns>
public IObservable<Symbol> Get(string provider)
{
// Get symbols from store by provider and concat stream
// if update is being performed
return _store
.Get(provider)
.Concat(Observable
.If(() => _updating, _update
.TakeWhile(_ => _updating)
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol)));
}
/// <summary>
/// Gets all symbols and future symbols for a given provider.
/// </summary>
/// <param name="provider">The specified provider.</param>
/// <returns>The observable stream.</returns>
public IObservable<Symbol> GetStream(string provider)
{
// Get symbols from store by provider and concat stream
// of all future symbols
return _store
.Get(provider)
.Concat(_update
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol))
.Distinct();
}
public void Dispose()
{
_token?.Dispose();
}
}
非常感谢对此的任何帮助。
再次浏览完这本书后,我认为 BehaviourSubject
非常适合该解决方案。我只包含了回答我的问题的相关代码片段。
正在订阅 BehaviourSubject
,当 update
完成处理后会收到通知,从而完成可观察并处置令牌。
private readonly BehaviorSubject<bool> _updating;
public SymbolService(SymbolOptions options, ISymbolStore store,
IEnumerable<ISymbolProvider> providers)
{
_store = store;
_updating = new BehaviorSubject<bool>(true);
// Request and aggregate symbols from providers and put
// into store.
var source = providers
.Select(provider => provider
.GetSymbols()
.Select(symbol => (provider.Name, symbol)))
.Merge();
// Perform update of symbols every interval
var update = Observable
.Timer(TimeSpan.Zero, options.RefreshInterval)
.Log(_logger, "Updating", LogEventLevel.Information)
.Do(_ => _updating.OnNext(true))
.SelectMany(_ =>
{
return source
.GetSymbols()
.Do(_store.Put)
.Finally(() => _updating.OnNext(false));
})
.Publish();
_update = update;
_token = update
.Connect();
}
public IObservable<Symbol> Get(string provider)
{
// Get symbols from store by provider and concat stream
// if update is being performed
return _store
.Get(provider)
.Concat(_update
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol))
.TakeUntil(_updating.Where(x => x == false));
}
我对反应式编程还比较陌生,即使在阅读了 Rx.Net in Action 之后,我仍然在努力理解它。
我有一个服务 class,它有两个 public 方法 Get
和 GetStream
。
GetStream
将创建一个 observable,它将连接 store
和 update
observable,以便消费者毫无问题地接收未来的更新。
update
observable 每分钟从 providers
执行符号更新 - 轮询 api 请求
Get
将创建一个订阅来自 store
的项目的可观察对象,并与正在更新的 update
可观察对象连接 if。问题是更新完成后,没有 OnComplete
被调用并且消费者无限期挂起。我将这个 observable 用作某些地方的阻塞调用,而在其他地方用作非阻塞调用。
我想我可以摆脱 ResetEvent
但这似乎不是一种非常被动的处理方式。有更好的响应式替代方案吗?也许我正在尝试在应该 return 和 IEnumerable
?
目标是让一个可观察对象每隔指定的时间间隔更新一次存储。有一种方法可以获取商店的当前状态,包括当前更新。有另一种方法来获取商店的状态和任何未来的更新。
这是我目前所处位置的代码片段。
public class SymbolService : ISymbolService, IDisposable
{
private readonly ILogger _logger = Logger.Create(nameof(SymbolService));
private readonly IObservable<(string Provider, Symbol Symbol)> _update;
private readonly ISymbolStore _store;
private readonly IDisposable _token;
private bool _updating = true;
public SymbolService(SymbolOptions options, ISymbolStore store,
IEnumerable<ISymbolProvider> providers)
{
_store = store;
// Request and aggregate symbols from providers and put
// into store.
var source = providers
.Select(provider => provider
.GetSymbols()
.Select(symbol => (provider.Name, symbol))
.Take(10))
.Merge()
.Do(entry =>
{
_updating = true;
_store.Put(entry);
},
() =>
_updating = false)
.Log(_logger, "Symbol");
// Perform update of symbols every interval
var update = Observable
.Timer(TimeSpan.Zero, options.RefreshInterval)
.SelectMany(_ => source)
.Publish();
_update = update;
_token = update
.Connect();
}
/// <summary>
/// Gets all symbols for a given provider.
/// </summary>
/// <param name="provider">The specified provider.</param>
/// <returns>The observable stream.</returns>
public IObservable<Symbol> Get(string provider)
{
// Get symbols from store by provider and concat stream
// if update is being performed
return _store
.Get(provider)
.Concat(Observable
.If(() => _updating, _update
.TakeWhile(_ => _updating)
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol)));
}
/// <summary>
/// Gets all symbols and future symbols for a given provider.
/// </summary>
/// <param name="provider">The specified provider.</param>
/// <returns>The observable stream.</returns>
public IObservable<Symbol> GetStream(string provider)
{
// Get symbols from store by provider and concat stream
// of all future symbols
return _store
.Get(provider)
.Concat(_update
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol))
.Distinct();
}
public void Dispose()
{
_token?.Dispose();
}
}
非常感谢对此的任何帮助。
再次浏览完这本书后,我认为 BehaviourSubject
非常适合该解决方案。我只包含了回答我的问题的相关代码片段。
正在订阅 BehaviourSubject
,当 update
完成处理后会收到通知,从而完成可观察并处置令牌。
private readonly BehaviorSubject<bool> _updating;
public SymbolService(SymbolOptions options, ISymbolStore store,
IEnumerable<ISymbolProvider> providers)
{
_store = store;
_updating = new BehaviorSubject<bool>(true);
// Request and aggregate symbols from providers and put
// into store.
var source = providers
.Select(provider => provider
.GetSymbols()
.Select(symbol => (provider.Name, symbol)))
.Merge();
// Perform update of symbols every interval
var update = Observable
.Timer(TimeSpan.Zero, options.RefreshInterval)
.Log(_logger, "Updating", LogEventLevel.Information)
.Do(_ => _updating.OnNext(true))
.SelectMany(_ =>
{
return source
.GetSymbols()
.Do(_store.Put)
.Finally(() => _updating.OnNext(false));
})
.Publish();
_update = update;
_token = update
.Connect();
}
public IObservable<Symbol> Get(string provider)
{
// Get symbols from store by provider and concat stream
// if update is being performed
return _store
.Get(provider)
.Concat(_update
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol))
.TakeUntil(_updating.Where(x => x == false));
}