如何通过键对反应式可观察对象进行分区并将分区合并为组结合组中每个分区的最后一个元素

How to partition a reactive observable by a key and merge the partitions into groups combine last element of each partition of the group

我有一系列项目的热观察,这些项目具有标识特定子流的键。我有兴趣将这些 M 流映射到 N,其中 N < M(将它们分组到 N 个桶中)。对于每个桶,每次一个元素到达时,我想对该组的每个下划线序列的最新元素应用一个函数。我对 N 组和 M 组都有先验知识。

在下面的示例中,我们有四种水果的报价序列。我想根据水果的类型(苹果或梨)将这些流映射成两部分。对于每个组,我想收集每个水果的最后已知报价。

class Input {
    public string ProductID {get;set;}
    public string ProductType {get;set;}
    public int    Price {get;set;}
}

class Output {
    public string  ProductType {get;set;}
    public Input[] Underlining {get;set;}
}

var obs = new List<Input> {
    new Input { ProductID = "Stark",    ProductType = "Apple", Price = 21 },
    new Input { ProductID = "Jonagold", ProductType = "Apple", Price = 12 },
    new Input { ProductID = "Williams", ProductType = "Pear",  Price = 33 },
    new Input { ProductID = "Beth",     ProductType = "Pear",  Price = 22 },
    new Input { ProductID = "Stark",    ProductType = "Apple", Price = 43 },
    new Input { ProductID = "Williams", ProductType = "Pear",  Price = 55 },
    new Input { ProductID = "Beth",     ProductType = "Pear",  Price = 66 },
    new Input { ProductID = "Jonagold", ProductType = "Apple", Price = 77 },
    new Input { ProductID = "Jonagold", ProductType = "Apple", Price = 25 },
    new Input { ProductID = "Williams", ProductType = "Pear",  Price = 77 },
    new Input { ProductID = "Beth",     ProductType = "Pear",  Price = 13 },
    new Input { ProductID = "Stark",    ProductType = "Apple", Price = 21 },
}.ToObservable();

IObservable<Output> result = obs.GroupBy ... Select ... Concat ... ; // I'm a bit loss here 


result.Dump();

预期结果:

{ ProductType = "Apple", Underlining = [{ ProductID = "Stark",    Price = 21 }] }
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark",    Price = 21 },     { ProductID = "Jonagold", Price = 12 }] }
{ ProductType = "Pear",  Underlining = [{ ProductID = "Williams", Price = 23 }] }
{ ProductType = "Pear",  Underlining = [{ ProductID = "Williams", Price = 23 },     { ProductID = "Beth",     Price = 22 }] }
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark",    Price = **43** }, { ProductID = "Jonagold", Price = 12 }] }
{ ProductType = "Pear",  Underlining = [{ ProductID = "Williams", Price = **55** }, { ProductID = "Beth",     Price = 22 }] }
{ ProductType = "Pear",  Underlining = [{ ProductID = "Williams", Price = 55 },     { ProductID = "Beth",     Price = **66** }] }
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark",    Price = 43 },     { ProductID = "Jonagold", Price = **77** }] }
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark",    Price = 43 },     { ProductID = "Jonagold", Price = **25** }] }
{ ProductType = "Pear",  Underlining = [{ ProductID = "Williams", Price = **77** }, { ProductID = "Beth",     Price = 66 }] }
{ ProductType = "Pear",  Underlining = [{ ProductID = "Williams", Price = 77 },     { ProductID = "Beth",     Price = **13** }] }
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark",    Price = **21** }, { ProductID = "Jonagold", Price = 25 }] }

我想这就是你想要的:

var outputs =
    obs
        .GroupBy(x => x.ProductType)
        .Select(xs =>
            xs
                .Scan(
                    new Dictionary<string, Input>(),
                    (d, x) => { d[x.ProductID] = x; return d; })
                .Select(x => new Output()
                {
                    ProductType = xs.Key,
                    Underlining = x.Values.ToArray(),
                }))
        .Merge();

我使用 outputs.Select(x => $"{{ ProductType = \"{x.ProductType}\", Underlining = [{String.Join(", ", x.Underlining.Select(y => $"{{ ProductID = \"{y.ProductID}\", Price = {y.Price} }}"))}] }}") 得到以下输出来测试它:

{ ProductType = "Apple", Underlining = [{ ProductID = "Stark", Price = 21 }] } 
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark", Price = 21 }, { ProductID = "Jonagold", Price = 12 }] } 
{ ProductType = "Pear", Underlining = [{ ProductID = "Williams", Price = 33 }] } 
{ ProductType = "Pear", Underlining = [{ ProductID = "Williams", Price = 33 }, { ProductID = "Beth", Price = 22 }] } 
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark", Price = 43 }, { ProductID = "Jonagold", Price = 12 }] } 
{ ProductType = "Pear", Underlining = [{ ProductID = "Williams", Price = 55 }, { ProductID = "Beth", Price = 22 }] } 
{ ProductType = "Pear", Underlining = [{ ProductID = "Williams", Price = 55 }, { ProductID = "Beth", Price = 66 }] } 
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark", Price = 43 }, { ProductID = "Jonagold", Price = 77 }] } 
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark", Price = 43 }, { ProductID = "Jonagold", Price = 25 }] } 
{ ProductType = "Pear", Underlining = [{ ProductID = "Williams", Price = 77 }, { ProductID = "Beth", Price = 66 }] } 
{ ProductType = "Pear", Underlining = [{ ProductID = "Williams", Price = 77 }, { ProductID = "Beth", Price = 13 }] } 
{ ProductType = "Apple", Underlining = [{ ProductID = "Stark", Price = 21 }, { ProductID = "Jonagold", Price = 25 }] }