根据 begin/end 个事件对反应流中的事件进行分组
Grouping events in a reactive stream based on begin/end events
我有一个我观察到的反应流 Observable<Event>
,目前它直接发出事件。基于 BEGIN/END 事件,我想在内部组的支持下对这一系列事件进行分组。
输入流
我有一连串的事件如下:
Event(type = Data, groupId = 1)
Event(type = BeginGroup, groupId = 2) // outer group begins
Event(type = Data, groupId = 2)
Event(type = BeginGroup, groupId = 3) // inner group begins
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3) // inner group ends
Event(type = EndGroup, groupId = 2) // outer group ends
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)
编辑 - 附加先决条件:
我在示例数据中添加了 ID,但通常不需要 ID。该流将注意满足以下条件:
- 每个
BeginGroup
事件都会在某个时候跟随相应的 EndGroup
事件
- 组内的事件(在我的示例中具有相同的组想法)将总是在流内的begin/end事件内,因此顺序得到保证(例如在上面的例子中)
DESIRED 输出流
因此,我确保每个事件都是其上方组的一部分,或者如果它不是真实组的一部分,则具有唯一 ID。我想将上述 9 个事件流分组到以下 4 个事件流:
Event(type = Data, groupId = 1)
GroupEvent(groupId = 2, data = <LIST of Events and/or sub groups>) with following data:
data = [
Event(type = BeginGroup, groupId = 2)
Event(type = Data, groupId = 2)
GroupEvent(groupId = 3, data = <LIST of Events and/or sub groups>) with following data:
data = [
Event(type = BeginGroup, groupId = 3)
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3)
]
Event(type = EndGroup, groupId = 2)
]
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)
我要的-逻辑
我想在 BeginGroup
类型的事件发生后立即开始分组,直到正确的 EndGroup
事件发生,并将这两个事件之间的所有事件分组,包括最终嵌套的组事件。 begin/end 组事件之外的元素仅作为单个事件传递。
这是我目前尝试的方法
我在问这个问题之前尝试了一些东西,但我来自 java,我什至很少在那里使用 window/buffer 运算符,而且对它们的经验也很少。我看到 C# 中有类似 GroupByUntil
oeprator 的东西,所以我尝试使用它,但在我的示例中它从不发出任何东西。
var eventObservable: Observable<Event> = ...
// 1) make the observable hot so it can be resued inside the groupbyuntil operator
eventObservable = observable.Publish().RefCount();
var res = eventObservable
.GroupByUntil(
e => e.GroupId, // selector for groups => the group id can be used here
grp => eventObservable.Where(e => e.GroupId != grp.Key) // stop a group as soon as the group id changes
)
.SelectMany(data => data.ToList()) // flatten the observable
.Select(data => {
// Convert the list of Events to GroupEvent if it contains more than 1 event
var list = data.ToList();
if (list.Count == 1)
return list[0];
return new GroupEvent(list);
})
这种方法行不通,它根本不会发出任何东西(不过 eventObservable
确实会正确发出它的项目)。此外,它缺少对嵌套组的支持(理论上是否可行)。
有人可以向我解释如何解决我的问题吗?
先转储代码,再解释。
数据类我写:
public enum EventType
{
Data,
BeginGroup,
EndGroup,
Group
}
public class Event<T>
{
public Event(int id, EventType type, T data)
{
this.Id = id;
this.Type = type;
this.Data = data;
}
public int Id { get; set; }
public EventType Type { get; set; }
public T Data { get; set;}
}
public class GroupEvent<T> : Event<T> {
public GroupEvent(int id, IEnumerable<Event<T>> events)
: base(id, EventType.Group, default(T))
{
this.ChildData = events;
}
public IEnumerable<Event<T>> ChildData { get; set; }
}
这里是逻辑扩展方法(使用 Nuget 包 System.Collections.Immutable
):
public static class X
{
public static IObservable<Event<T>> GroupEvents<T>(this IObservable<Event<T>> source)
{
return source
.Scan((groupId: 1, stack: ImmutableStack<ImmutableList<Event<T>>>.Empty, output: (Event<T>)null), (state, inEvent) =>
{
if(inEvent.Type == EventType.Data)
{
if (state.stack.IsEmpty)
return (state.groupId + 1, state.stack, new Event<T>(state.groupId, EventType.Data, inEvent.Data));
else
{
var newEvent = new Event<T>(state.stack.Peek()[0].Id, EventType.Data, inEvent.Data);
var newList = state.stack.Peek().Add(newEvent);
var newStack = state.stack.Pop().Push(newList);
return (state.groupId, newStack, null);
}
}
if(inEvent.Type == EventType.BeginGroup)
{
var newEvent = new Event<T>(state.groupId, EventType.BeginGroup, inEvent.Data);
return (state.groupId + 1, state.stack.Push(ImmutableList<Event<T>>.Empty.Add(newEvent)), null);
}
if (inEvent.Type == EventType.EndGroup)
{
var newEvent = new Event<T>(state.stack.Peek()[0].Id, EventType.EndGroup, inEvent.Data);
var newList = state.stack.Peek().Add(newEvent);
var newStack = state.stack.Pop();
var toEmit = new GroupEvent<T>(newList[0].Id, newList);
if(newStack.IsEmpty)
return (state.groupId, newStack, toEmit);
else
{
var parentList = newStack.Peek().Add(toEmit);
newStack = newStack.Pop().Push(parentList);
return (state.groupId, newStack, null);
}
}
throw new NotImplementedException();
})
.Where(t => t.output != null)
.Select(t => t.output);
}
}
这是跑步者代码:
var s = new Subject<Event<int>>();
var o = s.GroupEvents();
s.OnNext(new Event<int>(-1, EventType.Data, 1));
s.OnNext(new Event<int>(-1, EventType.BeginGroup, 2));
s.OnNext(new Event<int>(-1, EventType.Data, 3));
s.OnNext(new Event<int>(-1, EventType.BeginGroup, 4));
s.OnNext(new Event<int>(-1, EventType.Data, 5));
s.OnNext(new Event<int>(-1, EventType.EndGroup, 6));
s.OnNext(new Event<int>(-1, EventType.EndGroup, 7));
s.OnNext(new Event<int>(-1, EventType.Data, 8));
s.OnNext(new Event<int>(-1, EventType.Data, 9));
结果与您在问题中预期的一样。
解释:
我采用了状态机方法,这通常意味着使用 .Scan
方法。我们这里的状态是 运行 groupId
计数,以及一堆消息列表。堆栈的顶部表示我们当前正在寻找将消息添加到哪个组。由于 Scan
不允许您区分状态和输出,我们的第三个状态值是输出变量。
我正在使用不可变集合,因为它们与 Rx 配合得最好。如果您使用可变集合,性能可以提高,但您必须注意多重订阅的影响。
至于使用内置运算符(GroupBy
、Buffer
、Window
、Join
等),我不认为这些将与您想要的树状递归结构配合使用。如果您对更扁平的结构没问题,那么 Window
可能会起作用,但它需要一些工作。
我有一个我观察到的反应流 Observable<Event>
,目前它直接发出事件。基于 BEGIN/END 事件,我想在内部组的支持下对这一系列事件进行分组。
输入流
我有一连串的事件如下:
Event(type = Data, groupId = 1)
Event(type = BeginGroup, groupId = 2) // outer group begins
Event(type = Data, groupId = 2)
Event(type = BeginGroup, groupId = 3) // inner group begins
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3) // inner group ends
Event(type = EndGroup, groupId = 2) // outer group ends
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)
编辑 - 附加先决条件:
我在示例数据中添加了 ID,但通常不需要 ID。该流将注意满足以下条件:
- 每个
BeginGroup
事件都会在某个时候跟随相应的EndGroup
事件 - 组内的事件(在我的示例中具有相同的组想法)将总是在流内的begin/end事件内,因此顺序得到保证(例如在上面的例子中)
DESIRED 输出流
因此,我确保每个事件都是其上方组的一部分,或者如果它不是真实组的一部分,则具有唯一 ID。我想将上述 9 个事件流分组到以下 4 个事件流:
Event(type = Data, groupId = 1)
GroupEvent(groupId = 2, data = <LIST of Events and/or sub groups>) with following data:
data = [
Event(type = BeginGroup, groupId = 2)
Event(type = Data, groupId = 2)
GroupEvent(groupId = 3, data = <LIST of Events and/or sub groups>) with following data:
data = [
Event(type = BeginGroup, groupId = 3)
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3)
]
Event(type = EndGroup, groupId = 2)
]
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)
我要的-逻辑
我想在 BeginGroup
类型的事件发生后立即开始分组,直到正确的 EndGroup
事件发生,并将这两个事件之间的所有事件分组,包括最终嵌套的组事件。 begin/end 组事件之外的元素仅作为单个事件传递。
这是我目前尝试的方法
我在问这个问题之前尝试了一些东西,但我来自 java,我什至很少在那里使用 window/buffer 运算符,而且对它们的经验也很少。我看到 C# 中有类似 GroupByUntil
oeprator 的东西,所以我尝试使用它,但在我的示例中它从不发出任何东西。
var eventObservable: Observable<Event> = ...
// 1) make the observable hot so it can be resued inside the groupbyuntil operator
eventObservable = observable.Publish().RefCount();
var res = eventObservable
.GroupByUntil(
e => e.GroupId, // selector for groups => the group id can be used here
grp => eventObservable.Where(e => e.GroupId != grp.Key) // stop a group as soon as the group id changes
)
.SelectMany(data => data.ToList()) // flatten the observable
.Select(data => {
// Convert the list of Events to GroupEvent if it contains more than 1 event
var list = data.ToList();
if (list.Count == 1)
return list[0];
return new GroupEvent(list);
})
这种方法行不通,它根本不会发出任何东西(不过 eventObservable
确实会正确发出它的项目)。此外,它缺少对嵌套组的支持(理论上是否可行)。
有人可以向我解释如何解决我的问题吗?
先转储代码,再解释。
数据类我写:
public enum EventType
{
Data,
BeginGroup,
EndGroup,
Group
}
public class Event<T>
{
public Event(int id, EventType type, T data)
{
this.Id = id;
this.Type = type;
this.Data = data;
}
public int Id { get; set; }
public EventType Type { get; set; }
public T Data { get; set;}
}
public class GroupEvent<T> : Event<T> {
public GroupEvent(int id, IEnumerable<Event<T>> events)
: base(id, EventType.Group, default(T))
{
this.ChildData = events;
}
public IEnumerable<Event<T>> ChildData { get; set; }
}
这里是逻辑扩展方法(使用 Nuget 包 System.Collections.Immutable
):
public static class X
{
public static IObservable<Event<T>> GroupEvents<T>(this IObservable<Event<T>> source)
{
return source
.Scan((groupId: 1, stack: ImmutableStack<ImmutableList<Event<T>>>.Empty, output: (Event<T>)null), (state, inEvent) =>
{
if(inEvent.Type == EventType.Data)
{
if (state.stack.IsEmpty)
return (state.groupId + 1, state.stack, new Event<T>(state.groupId, EventType.Data, inEvent.Data));
else
{
var newEvent = new Event<T>(state.stack.Peek()[0].Id, EventType.Data, inEvent.Data);
var newList = state.stack.Peek().Add(newEvent);
var newStack = state.stack.Pop().Push(newList);
return (state.groupId, newStack, null);
}
}
if(inEvent.Type == EventType.BeginGroup)
{
var newEvent = new Event<T>(state.groupId, EventType.BeginGroup, inEvent.Data);
return (state.groupId + 1, state.stack.Push(ImmutableList<Event<T>>.Empty.Add(newEvent)), null);
}
if (inEvent.Type == EventType.EndGroup)
{
var newEvent = new Event<T>(state.stack.Peek()[0].Id, EventType.EndGroup, inEvent.Data);
var newList = state.stack.Peek().Add(newEvent);
var newStack = state.stack.Pop();
var toEmit = new GroupEvent<T>(newList[0].Id, newList);
if(newStack.IsEmpty)
return (state.groupId, newStack, toEmit);
else
{
var parentList = newStack.Peek().Add(toEmit);
newStack = newStack.Pop().Push(parentList);
return (state.groupId, newStack, null);
}
}
throw new NotImplementedException();
})
.Where(t => t.output != null)
.Select(t => t.output);
}
}
这是跑步者代码:
var s = new Subject<Event<int>>();
var o = s.GroupEvents();
s.OnNext(new Event<int>(-1, EventType.Data, 1));
s.OnNext(new Event<int>(-1, EventType.BeginGroup, 2));
s.OnNext(new Event<int>(-1, EventType.Data, 3));
s.OnNext(new Event<int>(-1, EventType.BeginGroup, 4));
s.OnNext(new Event<int>(-1, EventType.Data, 5));
s.OnNext(new Event<int>(-1, EventType.EndGroup, 6));
s.OnNext(new Event<int>(-1, EventType.EndGroup, 7));
s.OnNext(new Event<int>(-1, EventType.Data, 8));
s.OnNext(new Event<int>(-1, EventType.Data, 9));
结果与您在问题中预期的一样。
解释:
我采用了状态机方法,这通常意味着使用 .Scan
方法。我们这里的状态是 运行 groupId
计数,以及一堆消息列表。堆栈的顶部表示我们当前正在寻找将消息添加到哪个组。由于 Scan
不允许您区分状态和输出,我们的第三个状态值是输出变量。
我正在使用不可变集合,因为它们与 Rx 配合得最好。如果您使用可变集合,性能可以提高,但您必须注意多重订阅的影响。
至于使用内置运算符(GroupBy
、Buffer
、Window
、Join
等),我不认为这些将与您想要的树状递归结构配合使用。如果您对更扁平的结构没问题,那么 Window
可能会起作用,但它需要一些工作。