创建一个 IObservable<T> returns 适当地处理大量数据(异步?)

Creating an IObservable<T> that returns properly a ton of data (asynchronously?)

我对 IObservable<T> 不是很熟悉,但我正在使用的软件包在某种程度上强加了它。

我需要 return 一个 IObservable<T> 稍后调用 Subscribe,然后立即处理结果。我的意图是用来自海量数据集的数据填充它。它从文本文件中逐行读取 GB 的数据。

但我似乎无法找到一个很好的例子来说明如何使我的一个小时长的 while 循环以一种不期望预先读取所有数据的方式变成可观察的。 我看到 Observable.FromAsync 有一些变体,但任务也不是我的强项,而且似乎也无法让它发挥作用。

我目前最好的成绩如下。编译并运行,但什么也不做。据我所知,从来没有调用 Create 中的代码。

public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
    return Observable.Create<Data>(async (subject, token) =>
    {
        try
        {
            FileStream stream = null;
            StreamReader sr = null;
            DateTime date = startDate;
            string path = string.Format("C:\MYPATH\{0}-{1}-{2}.csv", date.Year,
                date.Month.ToString("00"), date.Day.ToString("00"));
            while (date < endDate)
            {
                if (!File.Exists(path))
                {
                    date.AddDays(1);
                    continue;
                }
                stream = File.Open(path, FileMode.Create, FileAccess.Read);
                sr = new StreamReader(stream);
                while (!sr.EndOfStream)
                {
                    string line = await sr.ReadLineAsync();
                    Data d = ParseData(line);
                    subject.OnNext(d);
                }
                if (stream != null)
                {
                    sr.Close();
                    stream.Close();
                }
            }
        }
        catch (Exception ex)
        {
            try
            {
                subject.OnError(ex);
            }
            catch (Exception)
            {
                Console.WriteLine("An exception was thrown while trying to call" +
                    " OnError on the observable subject -- means you're not" +
                    " catching exceptions");
                throw;
            }
        }
    }).Publish();
}

我什至不确定我想做的事情在技术上是否可行,因为我不确定 Observable 模式是如何工作的。但是由于上下文,它似乎期望服务器连接正常地为它提供 DataStream 。所以我认为这是可以做到的。但前提是 Observable 创建方法的正确组合。

如果有人有一些很好的文档可以阅读,以初学者友好的方式解释这一点,那也很好。

根据要求,该方法的调用方式如下,但它主要进入黑盒库。

IObservable<Data> data = GetHistoricalData(
    new DateTime(2021, 1, 1, 0, 0, 0, DateTimeKind.Utc),
    new DateTime(2021, 1, 5, 0, 0, 0, DateTimeKind.Utc));

// Build charts from data
IObservable<(Data, Chart)> dataWithChart = data.GenerateCharts(TimeFrame);

// Generate signals from the charts
IObservable<(Signal, Chart)> signalsWithChart = dataWithChart.GenerateSignals(
    Symbol, strategy);

// We're interested in the signals only
IObservable<Signal> signals = signalsWithChart.SelectSignals();

// Show information from each signal
IDisposable subscription = signals.Subscribe(ShowSignal);
subscription.Dispose();

我认为你应该阅读 "Introduction to Rx", especially the section about hot and cold observables, and publish and connect

您的代码存在几个较小的问题。 我为您的代码做了一个更简单的版本,在我删除了对 .Publish() 的调用后它就可以工作了。而且我相当确定您不希望出现在此处。

Publish 制作一个支持多个观察者的包装器。您可以使用 Publish() 使其适用于多个观察者,然后在所有观察者都订阅后调用 Connect。但是发布更适合“热”流,例如 mouse/keyboard 事件。您的数据只被读取一次。调用 Connect 后连接的任何可观察对象都不会获取已读取的数据。如果你需要这样的多个订阅者,我会 return IConnectableObservable 而不是 IObservable,所以你可以 Connect() 一旦所有观察者都订阅了它。

至于你的代码:

  • 保持简单。每当您使用流时,请使用 using,除非您非常清楚为什么不应该这样做。
  • 计算循环内的路径。使用字符串插值代替 string.Format().
  • 最后调用subject.OnCompleted()
  • 重新分配 date 变量。 DateTime C# 中的值是不可变的。 date.AddDays() 不修改 date,但 return 是一个新的 DateTime
  • 使用您真正想要的文件模式。如果您不打算在文件不存在时调用代码,则不需要 .Create。

这对我有用:

public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
    return Observable.Create<Data>(async (subject, token) =>
    {
        var date = startDate;
        try
        {
            while (date < endDate)
            {
                string path = $@"C:\MYPATH\{date.Year}-{date.Month:00}-{date.Day:00}.csv";

                if (File.Exists(path))
                {
                    using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
                    using (var sr = new StreamReader(stream))
                    {
                        while (!sr.EndOfStream)
                        {
                            var line = await sr.ReadLineAsync();
                            if (!string.IsNullOrWhiteSpace(line))
                            {
                                var data = ParseData(line);
                                subject.OnNext(data);
                            }
                        }
                    }
                }

                date = date.AddDays(1);
            }
        } catch (Exception e)
        {
            subject.OnError(e);
        }
        
        subject.OnCompleted();
    });
}

给你。一个很好的查询:

public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
    from n in Observable.Range(0, int.MaxValue)
    let d = startDate.AddDays(n)
    where d < endDate
    let path = $"C:\MYPATH\{d.ToString("yyyy-MM-dd")}.csv"
    where File.Exists(path)
    from l in Observable.Using(
        () => File.Open(path, FileMode.Open, FileAccess.Read),
        s => Observable.Using(
            () => new StreamReader(s),
            sr => Observable.While(
                () => !sr.EndOfStream,
                Observable.Defer(
                    () => Observable.FromAsync(
                    () => sr.ReadLineAsync())))))
    select ParseData(l);

您的代码似乎被误用 IObservable<T> monad 的包所消耗,将其视为 IEnumerable<T>。当某人 Subscribes 到一个可观察的序列然后立即取消订阅时,他们只会观察在订阅期间同步推送的通知。这相当于同步枚举一个IEnumerable<T>。所以我的建议是通过编写基于 IEnumerable<T> 的代码来简化您的生活,然后在使用 ToObservable Rx 运算符将其转换为可观察对象后将序列传递给包:

public static IEnumerable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
    //...
    foreach (var line in File.ReadLines(path))
    {
        yield return ParseData(line);
    }
    //...
}

从可枚举到可观察的转换:

IObservable<Data> data = GetHistoricalData(date1, date2).ToObservable();