RX Extension ToObservable() 在查询嵌套的 IColletion 时产生异常

RX Extension ToObservable() yields an exception when querying a nested IColletion

我只是想熟悉使用 ef 和 rx。不幸的是,我遇到了一个我无法解决的问题。我有一个名为 world.sql 的 MySQL 测试数据库。 EF 产生了以下内容

public worldEntities()
        : base("name=worldEntities")
    {
    }

    protected override void OnModelCreating(DbModelBuilder modelBuilder)
    {
        throw new UnintentionalCodeFirstException();
    }

    public virtual DbSet<city> city { get; set; }
    public virtual DbSet<country> country { get; set; }
    public virtual DbSet<countrylanguage> countrylanguage { get; set; }

我现在正在尝试比较“简单的”Linq 查询和 Rx 提供的 ToObservable() 替代方案。使用 Linq 我有以下有效的查询:

var m = new worldEntities();
        var res = m.country.Where(e => e.countrylanguage.Any(i=>i.Language.Equals("German"))).Select(e => e.Name);

但是我无法找出对应的 Rx。如果我尝试使用相同的方法

var set = m.country.ToObservable();
        set.Where(e => e.countrylanguage.Any(i => i.Language.Equals("German")))
            .Select(e => e.Name).Buffer(50).
            Subscribe(l=>
            Items.AddRange(l));

我将收到一个异常,其中包含以下内部消息:

{"There is already an open DataReader associated with this Connection which must be closed first."}

所以我的问题是,RX 查询应该是什么样子才能获得相同的结果。

在此先致谢并致以亲切的问候。

可能类似的东西应该有效:

    var set = m
.country
.Where(e => e.countrylanguage.Any(i => i.Language.Equals("German")))
.Select(e => e.Name)
.ToObservable();
            set.Buffer(50).
                Subscribe(l=>
                Items.AddRange(l));

然而,它是非常不寻常的 RX 扩展应用程序,绝对不是 "get comfortable with" 的最佳应用程序。

数据库是拉式系统。您请求从数据库中提取数据。 Rx 就是处理推送请求,而不是拉取请求。这不是一个很好的 Rx 用法。

如果您想学习 Rx,请开始使用它进行事件处理。