缓冲后的反应性滤波器、拆分和延迟
Reactive Filter, Split and Delay after Buffering
我正在尝试通过缓冲 30 秒然后通过仅采用特定记录 ID 的最新版本来缩小列表来更改可观察负载。
它仍然是一个大对象,所以我想通过在特定时间间隔内将它分成几个项目来分散负载。
如果缓冲时间是 30 秒,我想 Buffer/Shrink 加载然后发送 5 个小批次,每个批次之间间隔 5 秒
简化流程:
sequence
buffer
filter
split
delay
a-a-b-a-b-b-c-b-c-b-c-b-a-b-a-b-b-a-b-c-b-c-a-b-a-b-b-c-b-f-b-a-b-a-b-a-b-c-c-a-f-a-b-s-c
<aababbcbcbcbab> <abbabcbcababbcbfb>
<abc> <abcf>
<<ab>,<c>> <<ab>,<cf>>
<ab> <c> <ab> <cf>
真实例子:
Record
int Id
int Version
notifications
.Buffer(TimeSpan.FromSeconds(30), 20000)
.Select(list => list.GroupBy(x=> x.Id)
.Select(group => group.OrderByDescending(x=> x.Version)
.FirstOrDefault()).ToList())
这是一种方法。使用表达式 index * 5 / list.Count
作为 GroupBy
的键,将每个不同的组细分为 5 个子组,然后将 Delay
强加给每个子组,持续时间相对于其在父级中的索引不同的组。
IObservable<List<Record>> query = notifications
.Buffer(TimeSpan.FromSeconds(30))
.Select(list => list
.GroupBy(x => x.Id)
.Select(g => g.OrderByDescending(x => x.Version).FirstOrDefault())
.ToList()
)
.SelectMany(list => list
.Select((x, i) => (x, i))
.GroupBy(e => e.i * 5 / list.Count, e => e.x)
.Select(g => g.ToList())
.Select((sublist, i) => Observable.Return(sublist)
.Delay(TimeSpan.FromSeconds(5 * i)))
)
.Merge();
我正在尝试通过缓冲 30 秒然后通过仅采用特定记录 ID 的最新版本来缩小列表来更改可观察负载。 它仍然是一个大对象,所以我想通过在特定时间间隔内将它分成几个项目来分散负载。
如果缓冲时间是 30 秒,我想 Buffer/Shrink 加载然后发送 5 个小批次,每个批次之间间隔 5 秒
简化流程:
sequence
buffer
filter
split
delay
a-a-b-a-b-b-c-b-c-b-c-b-a-b-a-b-b-a-b-c-b-c-a-b-a-b-b-c-b-f-b-a-b-a-b-a-b-c-c-a-f-a-b-s-c
<aababbcbcbcbab> <abbabcbcababbcbfb>
<abc> <abcf>
<<ab>,<c>> <<ab>,<cf>>
<ab> <c> <ab> <cf>
真实例子:
Record
int Id
int Version
notifications
.Buffer(TimeSpan.FromSeconds(30), 20000)
.Select(list => list.GroupBy(x=> x.Id)
.Select(group => group.OrderByDescending(x=> x.Version)
.FirstOrDefault()).ToList())
这是一种方法。使用表达式 index * 5 / list.Count
作为 GroupBy
的键,将每个不同的组细分为 5 个子组,然后将 Delay
强加给每个子组,持续时间相对于其在父级中的索引不同的组。
IObservable<List<Record>> query = notifications
.Buffer(TimeSpan.FromSeconds(30))
.Select(list => list
.GroupBy(x => x.Id)
.Select(g => g.OrderByDescending(x => x.Version).FirstOrDefault())
.ToList()
)
.SelectMany(list => list
.Select((x, i) => (x, i))
.GroupBy(e => e.i * 5 / list.Count, e => e.x)
.Select(g => g.ToList())
.Select((sublist, i) => Observable.Return(sublist)
.Delay(TimeSpan.FromSeconds(5 * i)))
)
.Merge();