创建一个 IObservable<T> returns 适当地处理大量数据(异步?)
Creating an IObservable<T> that returns properly a ton of data (asynchronously?)
我对 IObservable<T>
不是很熟悉,但我正在使用的软件包在某种程度上强加了它。
我需要 return 一个 IObservable<T>
稍后调用 Subscribe,然后立即处理结果。我的意图是用来自海量数据集的数据填充它。它从文本文件中逐行读取 GB 的数据。
但我似乎无法找到一个很好的例子来说明如何使我的一个小时长的 while 循环以一种不期望预先读取所有数据的方式变成可观察的。
我看到 Observable.FromAsync
有一些变体,但任务也不是我的强项,而且似乎也无法让它发挥作用。
我目前最好的成绩如下。编译并运行,但什么也不做。据我所知,从来没有调用 Create
中的代码。
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
return Observable.Create<Data>(async (subject, token) =>
{
try
{
FileStream stream = null;
StreamReader sr = null;
DateTime date = startDate;
string path = string.Format("C:\MYPATH\{0}-{1}-{2}.csv", date.Year,
date.Month.ToString("00"), date.Day.ToString("00"));
while (date < endDate)
{
if (!File.Exists(path))
{
date.AddDays(1);
continue;
}
stream = File.Open(path, FileMode.Create, FileAccess.Read);
sr = new StreamReader(stream);
while (!sr.EndOfStream)
{
string line = await sr.ReadLineAsync();
Data d = ParseData(line);
subject.OnNext(d);
}
if (stream != null)
{
sr.Close();
stream.Close();
}
}
}
catch (Exception ex)
{
try
{
subject.OnError(ex);
}
catch (Exception)
{
Console.WriteLine("An exception was thrown while trying to call" +
" OnError on the observable subject -- means you're not" +
" catching exceptions");
throw;
}
}
}).Publish();
}
我什至不确定我想做的事情在技术上是否可行,因为我不确定 Observable 模式是如何工作的。但是由于上下文,它似乎期望服务器连接正常地为它提供 DataStream
。所以我认为这是可以做到的。但前提是 Observable 创建方法的正确组合。
如果有人有一些很好的文档可以阅读,以初学者友好的方式解释这一点,那也很好。
根据要求,该方法的调用方式如下,但它主要进入黑盒库。
IObservable<Data> data = GetHistoricalData(
new DateTime(2021, 1, 1, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2021, 1, 5, 0, 0, 0, DateTimeKind.Utc));
// Build charts from data
IObservable<(Data, Chart)> dataWithChart = data.GenerateCharts(TimeFrame);
// Generate signals from the charts
IObservable<(Signal, Chart)> signalsWithChart = dataWithChart.GenerateSignals(
Symbol, strategy);
// We're interested in the signals only
IObservable<Signal> signals = signalsWithChart.SelectSignals();
// Show information from each signal
IDisposable subscription = signals.Subscribe(ShowSignal);
subscription.Dispose();
我认为你应该阅读 "Introduction to Rx", especially the section about hot and cold observables, and publish and connect。
您的代码存在几个较小的问题。
我为您的代码做了一个更简单的版本,在我删除了对 .Publish()
的调用后它就可以工作了。而且我相当确定您不希望出现在此处。
Publish
制作一个支持多个观察者的包装器。您可以使用 Publish()
使其适用于多个观察者,然后在所有观察者都订阅后调用 Connect
。但是发布更适合“热”流,例如 mouse/keyboard 事件。您的数据只被读取一次。调用 Connect
后连接的任何可观察对象都不会获取已读取的数据。如果你需要这样的多个订阅者,我会 return IConnectableObservable
而不是 IObservable
,所以你可以 Connect()
一旦所有观察者都订阅了它。
至于你的代码:
- 保持简单。每当您使用流时,请使用
using
,除非您非常清楚为什么不应该这样做。
- 计算循环内的路径。使用字符串插值代替
string.Format()
.
- 最后调用
subject.OnCompleted()
。
- 重新分配
date
变量。 DateTime
C# 中的值是不可变的。 date.AddDays()
不修改 date
,但 return 是一个新的 DateTime
。
- 使用您真正想要的文件模式。如果您不打算在文件不存在时调用代码,则不需要 .Create。
这对我有用:
public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
return Observable.Create<Data>(async (subject, token) =>
{
var date = startDate;
try
{
while (date < endDate)
{
string path = $@"C:\MYPATH\{date.Year}-{date.Month:00}-{date.Day:00}.csv";
if (File.Exists(path))
{
using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
using (var sr = new StreamReader(stream))
{
while (!sr.EndOfStream)
{
var line = await sr.ReadLineAsync();
if (!string.IsNullOrWhiteSpace(line))
{
var data = ParseData(line);
subject.OnNext(data);
}
}
}
}
date = date.AddDays(1);
}
} catch (Exception e)
{
subject.OnError(e);
}
subject.OnCompleted();
});
}
给你。一个很好的查询:
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
from n in Observable.Range(0, int.MaxValue)
let d = startDate.AddDays(n)
where d < endDate
let path = $"C:\MYPATH\{d.ToString("yyyy-MM-dd")}.csv"
where File.Exists(path)
from l in Observable.Using(
() => File.Open(path, FileMode.Open, FileAccess.Read),
s => Observable.Using(
() => new StreamReader(s),
sr => Observable.While(
() => !sr.EndOfStream,
Observable.Defer(
() => Observable.FromAsync(
() => sr.ReadLineAsync())))))
select ParseData(l);
您的代码似乎被误用 IObservable<T>
monad 的包所消耗,将其视为 IEnumerable<T>
。当某人 Subscribe
s 到一个可观察的序列然后立即取消订阅时,他们只会观察在订阅期间同步推送的通知。这相当于同步枚举一个IEnumerable<T>
。所以我的建议是通过编写基于 IEnumerable<T>
的代码来简化您的生活,然后在使用 ToObservable
Rx 运算符将其转换为可观察对象后将序列传递给包:
public static IEnumerable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
//...
foreach (var line in File.ReadLines(path))
{
yield return ParseData(line);
}
//...
}
从可枚举到可观察的转换:
IObservable<Data> data = GetHistoricalData(date1, date2).ToObservable();
我对 IObservable<T>
不是很熟悉,但我正在使用的软件包在某种程度上强加了它。
我需要 return 一个 IObservable<T>
稍后调用 Subscribe,然后立即处理结果。我的意图是用来自海量数据集的数据填充它。它从文本文件中逐行读取 GB 的数据。
但我似乎无法找到一个很好的例子来说明如何使我的一个小时长的 while 循环以一种不期望预先读取所有数据的方式变成可观察的。
我看到 Observable.FromAsync
有一些变体,但任务也不是我的强项,而且似乎也无法让它发挥作用。
我目前最好的成绩如下。编译并运行,但什么也不做。据我所知,从来没有调用 Create
中的代码。
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
return Observable.Create<Data>(async (subject, token) =>
{
try
{
FileStream stream = null;
StreamReader sr = null;
DateTime date = startDate;
string path = string.Format("C:\MYPATH\{0}-{1}-{2}.csv", date.Year,
date.Month.ToString("00"), date.Day.ToString("00"));
while (date < endDate)
{
if (!File.Exists(path))
{
date.AddDays(1);
continue;
}
stream = File.Open(path, FileMode.Create, FileAccess.Read);
sr = new StreamReader(stream);
while (!sr.EndOfStream)
{
string line = await sr.ReadLineAsync();
Data d = ParseData(line);
subject.OnNext(d);
}
if (stream != null)
{
sr.Close();
stream.Close();
}
}
}
catch (Exception ex)
{
try
{
subject.OnError(ex);
}
catch (Exception)
{
Console.WriteLine("An exception was thrown while trying to call" +
" OnError on the observable subject -- means you're not" +
" catching exceptions");
throw;
}
}
}).Publish();
}
我什至不确定我想做的事情在技术上是否可行,因为我不确定 Observable 模式是如何工作的。但是由于上下文,它似乎期望服务器连接正常地为它提供 DataStream
。所以我认为这是可以做到的。但前提是 Observable 创建方法的正确组合。
如果有人有一些很好的文档可以阅读,以初学者友好的方式解释这一点,那也很好。
根据要求,该方法的调用方式如下,但它主要进入黑盒库。
IObservable<Data> data = GetHistoricalData(
new DateTime(2021, 1, 1, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2021, 1, 5, 0, 0, 0, DateTimeKind.Utc));
// Build charts from data
IObservable<(Data, Chart)> dataWithChart = data.GenerateCharts(TimeFrame);
// Generate signals from the charts
IObservable<(Signal, Chart)> signalsWithChart = dataWithChart.GenerateSignals(
Symbol, strategy);
// We're interested in the signals only
IObservable<Signal> signals = signalsWithChart.SelectSignals();
// Show information from each signal
IDisposable subscription = signals.Subscribe(ShowSignal);
subscription.Dispose();
我认为你应该阅读 "Introduction to Rx", especially the section about hot and cold observables, and publish and connect。
您的代码存在几个较小的问题。
我为您的代码做了一个更简单的版本,在我删除了对 .Publish()
的调用后它就可以工作了。而且我相当确定您不希望出现在此处。
Publish
制作一个支持多个观察者的包装器。您可以使用 Publish()
使其适用于多个观察者,然后在所有观察者都订阅后调用 Connect
。但是发布更适合“热”流,例如 mouse/keyboard 事件。您的数据只被读取一次。调用 Connect
后连接的任何可观察对象都不会获取已读取的数据。如果你需要这样的多个订阅者,我会 return IConnectableObservable
而不是 IObservable
,所以你可以 Connect()
一旦所有观察者都订阅了它。
至于你的代码:
- 保持简单。每当您使用流时,请使用
using
,除非您非常清楚为什么不应该这样做。 - 计算循环内的路径。使用字符串插值代替
string.Format()
. - 最后调用
subject.OnCompleted()
。 - 重新分配
date
变量。DateTime
C# 中的值是不可变的。date.AddDays()
不修改date
,但 return 是一个新的DateTime
。 - 使用您真正想要的文件模式。如果您不打算在文件不存在时调用代码,则不需要 .Create。
这对我有用:
public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
return Observable.Create<Data>(async (subject, token) =>
{
var date = startDate;
try
{
while (date < endDate)
{
string path = $@"C:\MYPATH\{date.Year}-{date.Month:00}-{date.Day:00}.csv";
if (File.Exists(path))
{
using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
using (var sr = new StreamReader(stream))
{
while (!sr.EndOfStream)
{
var line = await sr.ReadLineAsync();
if (!string.IsNullOrWhiteSpace(line))
{
var data = ParseData(line);
subject.OnNext(data);
}
}
}
}
date = date.AddDays(1);
}
} catch (Exception e)
{
subject.OnError(e);
}
subject.OnCompleted();
});
}
给你。一个很好的查询:
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
from n in Observable.Range(0, int.MaxValue)
let d = startDate.AddDays(n)
where d < endDate
let path = $"C:\MYPATH\{d.ToString("yyyy-MM-dd")}.csv"
where File.Exists(path)
from l in Observable.Using(
() => File.Open(path, FileMode.Open, FileAccess.Read),
s => Observable.Using(
() => new StreamReader(s),
sr => Observable.While(
() => !sr.EndOfStream,
Observable.Defer(
() => Observable.FromAsync(
() => sr.ReadLineAsync())))))
select ParseData(l);
您的代码似乎被误用 IObservable<T>
monad 的包所消耗,将其视为 IEnumerable<T>
。当某人 Subscribe
s 到一个可观察的序列然后立即取消订阅时,他们只会观察在订阅期间同步推送的通知。这相当于同步枚举一个IEnumerable<T>
。所以我的建议是通过编写基于 IEnumerable<T>
的代码来简化您的生活,然后在使用 ToObservable
Rx 运算符将其转换为可观察对象后将序列传递给包:
public static IEnumerable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
//...
foreach (var line in File.ReadLines(path))
{
yield return ParseData(line);
}
//...
}
从可枚举到可观察的转换:
IObservable<Data> data = GetHistoricalData(date1, date2).ToObservable();