RX Subject, group/correlate 直到满足一个条件
RX Subject, group/correlate until a condition is met
我有一个项目流,我想创建一个按 属性 分组的订阅,直到满足条件。
例如,假设我想按名称对 EventItem
进行分组,直到观察到特定的 Id
。这样做的目的是关联事件,直到我们看到一个特定的 Id
表示将不再有事件关联。然后我想对每组相关事件执行一些操作。
public class EventItem
{
public int Id { get; set }
public string Name { get; set; }
}
// Using a Subject since it seems the simplest way
Subject<EventItem> eventStream;
...
// A seperate thread pushes EventItem objects into the Subject
eventStream.OnNext(eventItem);
// Struggling here...
IDisposable subscription = eventStream.????
我尝试了 GroupBy
、GroupByUntil
、TakeUntil
、TakeWhile
等的几种组合,但我不知道该怎么做(我我对 Rx 相当缺乏经验)
编辑
基于@Shlomo 的回答和 OP 关于希望它不完成的评论
var subscription = producer
.TakeWhile(ei => ei.Id != 42)
.GroupBy(x => x.Name)
.Select(o => o)
.SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out.
.Repeat()
.Subscribe();
原答案
这个有用吗?
Subject<Notification> producer = new Subject<Notification>();
//This way there's only one producer feeding the group and the duration
var connectedProducer =
producer
.Publish()
.RefCount();
connectedProducer
.GroupByUntil(
item => item.Name,
item => connectedProducer.Where(x=> x.Id == 3))
.SelectMany(result =>
{
return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) =>
{
dict.Add(item);
return dict;
});
})
//not sure if you need this but just a way to filter out the signal
.Where(item => item.First().Id != 3)
.Subscribe(results =>
{
//This will only run after a "3" is passed in and then you get a list of everything passed in with the names
//not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing
});
我根据我之前的一个回答做了一些修改。
我知道已经有一个可接受的答案,但它似乎过于复杂。处理@Enigmativity 的评论:
var subscription = eventStream
.GroupBy(x => x.Name)
.Select(o => o.TakeWhile(ei => ei.Id != 42))
.SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out.
.Subscribe(y => {});
我有一个项目流,我想创建一个按 属性 分组的订阅,直到满足条件。
例如,假设我想按名称对 EventItem
进行分组,直到观察到特定的 Id
。这样做的目的是关联事件,直到我们看到一个特定的 Id
表示将不再有事件关联。然后我想对每组相关事件执行一些操作。
public class EventItem
{
public int Id { get; set }
public string Name { get; set; }
}
// Using a Subject since it seems the simplest way
Subject<EventItem> eventStream;
...
// A seperate thread pushes EventItem objects into the Subject
eventStream.OnNext(eventItem);
// Struggling here...
IDisposable subscription = eventStream.????
我尝试了 GroupBy
、GroupByUntil
、TakeUntil
、TakeWhile
等的几种组合,但我不知道该怎么做(我我对 Rx 相当缺乏经验)
编辑
基于@Shlomo 的回答和 OP 关于希望它不完成的评论
var subscription = producer
.TakeWhile(ei => ei.Id != 42)
.GroupBy(x => x.Name)
.Select(o => o)
.SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out.
.Repeat()
.Subscribe();
原答案
这个有用吗?
Subject<Notification> producer = new Subject<Notification>();
//This way there's only one producer feeding the group and the duration
var connectedProducer =
producer
.Publish()
.RefCount();
connectedProducer
.GroupByUntil(
item => item.Name,
item => connectedProducer.Where(x=> x.Id == 3))
.SelectMany(result =>
{
return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) =>
{
dict.Add(item);
return dict;
});
})
//not sure if you need this but just a way to filter out the signal
.Where(item => item.First().Id != 3)
.Subscribe(results =>
{
//This will only run after a "3" is passed in and then you get a list of everything passed in with the names
//not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing
});
我根据我之前的一个回答做了一些修改。
我知道已经有一个可接受的答案,但它似乎过于复杂。处理@Enigmativity 的评论:
var subscription = eventStream
.GroupBy(x => x.Name)
.Select(o => o.TakeWhile(ei => ei.Id != 42))
.SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out.
.Subscribe(y => {});