RX Subject, group/correlate 直到满足一个条件

RX Subject, group/correlate until a condition is met

我有一个项目流,我想创建一个按 属性 分组的订阅,直到满足条件。

例如,假设我想按名称对 EventItem 进行分组,直到观察到特定的 Id。这样做的目的是关联事件,直到我们看到一个特定的 Id 表示将不再有事件关联。然后我想对每组相关事件执行一些操作。

public class EventItem
{
  public int Id { get; set }
  public string Name { get; set; }
}

// Using a Subject since it seems the simplest way
Subject<EventItem> eventStream;
...

// A seperate thread pushes EventItem objects into the Subject
eventStream.OnNext(eventItem);

// Struggling here...
IDisposable subscription = eventStream.????

我尝试了 GroupByGroupByUntilTakeUntilTakeWhile 等的几种组合,但我不知道该怎么做(我我对 Rx 相当缺乏经验)

编辑

基于@Shlomo 的回答和 OP 关于希望它不完成的评论

var subscription = producer
        .TakeWhile(ei => ei.Id != 42)
        .GroupBy(x => x.Name)
        .Select(o => o)
        .SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out.
        .Repeat()
        .Subscribe();

原答案

这个有用吗?

        Subject<Notification> producer = new Subject<Notification>();

        //This way there's only one producer feeding the group and the duration
        var connectedProducer =
            producer
                .Publish()
                .RefCount();

        connectedProducer
            .GroupByUntil(
                item => item.Name,
                item => connectedProducer.Where(x=> x.Id == 3))
            .SelectMany(result =>
            {
                return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) =>
                {
                    dict.Add(item);
                    return dict;
                });
            })
            //not sure if you need this but just a way to filter out the signal
            .Where(item => item.First().Id != 3) 
            .Subscribe(results =>
            {
                //This will only run after a "3" is passed in and then you get a list of everything passed in with the names
                //not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing
            });

我根据我之前的一个回答做了一些修改。

我知道已经有一个可接受的答案,但它似乎过于复杂。处理@Enigmativity 的评论:

var subscription = eventStream
    .GroupBy(x => x.Name)
    .Select(o => o.TakeWhile(ei => ei.Id != 42))
    .SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out.
    .Subscribe(y => {});