缓冲后的反应性滤波器、拆分和延迟

Reactive Filter, Split and Delay after Buffering

我正在尝试通过缓冲 30 秒然后通过仅采用特定记录 ID 的最新版本来缩小列表来更改可观察负载。 它仍然是一个大对象,所以我想通过在特定时间间隔内将它分成几个项目来分散负载。

如果缓冲时间是 30 秒,我想 Buffer/Shrink 加载然后发送 5 个小批次,每个批次之间间隔 5 秒

简化流程:

sequence
buffer
filter
split
delay
a-a-b-a-b-b-c-b-c-b-c-b-a-b-a-b-b-a-b-c-b-c-a-b-a-b-b-c-b-f-b-a-b-a-b-a-b-c-c-a-f-a-b-s-c
                          <aababbcbcbcbab>                  <abbabcbcababbcbfb>
                          <abc>                             <abcf>
                          <<ab>,<c>>                        <<ab>,<cf>> 
                          <ab>              <c>             <ab>              <cf>

真实例子:

Record
    int Id
    int Version

notifications  
    .Buffer(TimeSpan.FromSeconds(30), 20000)
    .Select(list => list.GroupBy(x=> x.Id)
                        .Select(group => group.OrderByDescending(x=> x.Version)
                                              .FirstOrDefault()).ToList())

这是一种方法。使用表达式 index * 5 / list.Count 作为 GroupBy 的键,将每个不同的组细分为 5 个子组,然后将 Delay 强加给每个子组,持续时间相对于其在父级中的索引不同的组。

IObservable<List<Record>> query = notifications
    .Buffer(TimeSpan.FromSeconds(30))
    .Select(list => list
        .GroupBy(x => x.Id)
        .Select(g => g.OrderByDescending(x => x.Version).FirstOrDefault())
        .ToList()
    )
    .SelectMany(list => list
        .Select((x, i) => (x, i))
        .GroupBy(e => e.i * 5 / list.Count, e => e.x)
        .Select(g => g.ToList())
        .Select((sublist, i) => Observable.Return(sublist)
            .Delay(TimeSpan.FromSeconds(5 * i)))
    )
    .Merge();