根据 begin/end 个事件对反应流中的事件进行分组

Grouping events in a reactive stream based on begin/end events

我有一个我观察到的反应流 Observable<Event>,目前它直接发出事件。基于 BEGIN/END 事件,我想在内部组的支持下对这一系列事件进行分组。

输入流

我有一连串的事件如下:

Event(type = Data, groupId = 1)
Event(type = BeginGroup, groupId = 2)   // outer group begins
Event(type = Data, groupId = 2)
Event(type = BeginGroup, groupId = 3)   // inner group begins
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3)     // inner group ends
Event(type = EndGroup, groupId = 2)     // outer group ends
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)

编辑 - 附加先决条件:

我在示例数据中添加了 ID,但通常不需要 ID。该流将注意满足以下条件:

DESIRED 输出流

因此,我确保每个事件都是其上方组的一部分,或者如果它不是真实组的一部分,则具有唯一 ID。我想将上述 9 个事件流分组到以下 4 个事件流:

Event(type = Data, groupId = 1)
GroupEvent(groupId = 2, data = <LIST of Events and/or sub groups>) with following data:
    data = [
        Event(type = BeginGroup, groupId = 2)
        Event(type = Data, groupId = 2)
        GroupEvent(groupId = 3, data = <LIST of Events and/or sub groups>) with following data:
            data = [
                Event(type = BeginGroup, groupId = 3)
                Event(type = Data, groupId = 3)
                Event(type = EndGroup, groupId = 3)
            ]
        Event(type = EndGroup, groupId = 2)
    ]
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)

我要的-逻辑

我想在 BeginGroup 类型的事件发生后立即开始分组,直到正确的 EndGroup 事件发生,并将这两个事件之间的所有事件分组,包括最终嵌套的组事件。 begin/end 组事件之外的元素仅作为单个事件传递。

这是我目前尝试的方法

我在问这个问题之前尝试了一些东西,但我来自 java,我什至很少在那里使用 window/buffer 运算符,而且对它们的经验也很少。我看到 C# 中有类似 GroupByUntil oeprator 的东西,所以我尝试使用它,但在我的示例中它从不发出任何东西。

var eventObservable: Observable<Event> = ...

// 1) make the observable hot so it can be resued inside the groupbyuntil operator
eventObservable = observable.Publish().RefCount();
var res = eventObservable
    .GroupByUntil(
        e => e.GroupId, // selector for groups => the group id can be used here
        grp => eventObservable.Where(e => e.GroupId != grp.Key) // stop a group as soon as the group id changes
    )
    .SelectMany(data => data.ToList()) // flatten the observable
    .Select(data => {
        // Convert the list of Events to GroupEvent if it contains more than 1 event
        var list = data.ToList();
        if (list.Count == 1)
            return list[0];
        return new GroupEvent(list);
    })

这种方法行不通,它根本不会发出任何东西(不过 eventObservable 确实会正确发出它的项目)。此外,它缺少对嵌套组的支持(理论上是否可行)。

有人可以向我解释如何解决我的问题吗?

先转储代码,再解释。

数据类我写:

public enum EventType
{
    Data,
    BeginGroup,
    EndGroup,
    Group
}

public class Event<T>
{
    public Event(int id, EventType type, T data)
    {
        this.Id = id;
        this.Type = type;
        this.Data = data;
    }

    public int Id { get; set; }
    public EventType Type { get; set; }
    public T Data { get; set;}
}

public class GroupEvent<T> : Event<T> {
    public GroupEvent(int id, IEnumerable<Event<T>> events)
        : base(id, EventType.Group, default(T))
    {
        this.ChildData = events;
    }

    public IEnumerable<Event<T>> ChildData { get; set; }
}

这里是逻辑扩展方法(使用 Nuget 包 System.Collections.Immutable):

public static class X
{
    public static IObservable<Event<T>> GroupEvents<T>(this IObservable<Event<T>> source)
    {
        return source
            .Scan((groupId: 1, stack: ImmutableStack<ImmutableList<Event<T>>>.Empty, output: (Event<T>)null), (state, inEvent) =>
            {
                if(inEvent.Type == EventType.Data)
                {
                    if (state.stack.IsEmpty)
                        return (state.groupId + 1, state.stack, new Event<T>(state.groupId, EventType.Data, inEvent.Data));
                    else
                    {
                        var newEvent = new Event<T>(state.stack.Peek()[0].Id, EventType.Data, inEvent.Data);
                        var newList = state.stack.Peek().Add(newEvent);
                        var newStack = state.stack.Pop().Push(newList);
                        return (state.groupId, newStack, null);
                    }
                }

                if(inEvent.Type == EventType.BeginGroup)
                {
                    var newEvent = new Event<T>(state.groupId, EventType.BeginGroup, inEvent.Data);
                    return (state.groupId + 1, state.stack.Push(ImmutableList<Event<T>>.Empty.Add(newEvent)), null);
                }

                if (inEvent.Type == EventType.EndGroup)
                {
                    var newEvent = new Event<T>(state.stack.Peek()[0].Id, EventType.EndGroup, inEvent.Data);
                    var newList = state.stack.Peek().Add(newEvent);
                    var newStack = state.stack.Pop();
                    var toEmit = new GroupEvent<T>(newList[0].Id, newList);
                    if(newStack.IsEmpty)
                        return (state.groupId, newStack, toEmit);
                    else
                    {
                        var parentList = newStack.Peek().Add(toEmit);
                        newStack = newStack.Pop().Push(parentList);
                        return (state.groupId, newStack, null);
                    }
                }

                throw new NotImplementedException();
            })
            .Where(t => t.output != null)
            .Select(t => t.output);
    }
}

这是跑步者代码:

var s = new Subject<Event<int>>();
var o = s.GroupEvents();

s.OnNext(new Event<int>(-1, EventType.Data, 1));
s.OnNext(new Event<int>(-1, EventType.BeginGroup, 2));
s.OnNext(new Event<int>(-1, EventType.Data, 3));
s.OnNext(new Event<int>(-1, EventType.BeginGroup, 4));
s.OnNext(new Event<int>(-1, EventType.Data, 5));
s.OnNext(new Event<int>(-1, EventType.EndGroup, 6));
s.OnNext(new Event<int>(-1, EventType.EndGroup, 7));
s.OnNext(new Event<int>(-1, EventType.Data, 8));
s.OnNext(new Event<int>(-1, EventType.Data, 9));

结果与您在问题中预期的一样。


解释:

我采用了状态机方法,这通常意味着使用 .Scan 方法。我们这里的状态是 运行 groupId 计数,以及一堆消息列表。堆栈的顶部表示我们当前正在寻找将消息添加到哪个组。由于 Scan 不允许您区分状态和输出,我们的第三个状态值是输出变量。

我正在使用不可变集合,因为它们与 Rx 配合得最好。如果您使用可变集合,性能可以提高,但您必须注意多重订阅的影响。

至于使用内置运算符(GroupByBufferWindowJoin 等),我不认为这些将与您想要的树状递归结构配合使用。如果您对更扁平的结构没问题,那么 Window 可能会起作用,但它需要一些工作。