当所有源都包含给定键时加入未知数量的源
Join an unknown number of sources when all sources contain given key
给定如下来源提供者:
IObservable<ISource> Sources();
每个 ISource 如下所示:
IObservable<IEnumerable<string>> ObserveData(string filter)
我想 return:
IObservable<IEnumerable<string>> Results
当从所有 ISources returned 给定字符串时。本质上我想要所有来源的交集。
如果添加了新的来源,那么一切都应该重新评估。
我正在努力想出一个通用的解决方案。我见过的大多数解决方案都有很多众所周知的来源。
任何想法表示赞赏。
回答
好的,经过一段时间的思考,我想出了我的答案。可能它可以改进,但它似乎对我有用,所以我将 post 放在这里以供参考,以防其他人遇到类似问题。感谢ibebbs和Shlomo抽空回复,不胜感激
//Arrange
var s1 = Substitute.For<ISource>();
s1.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "a", "b", "c", "d" }));
var s2 = Substitute.For<ISource>();
s2.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "b", "xx", "c", "d" }));
var s3 = Substitute.For<ISource>();
s3.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "yy", "b", "ff", "d" }));
var expected = new[] { "b", "d" };
var sources = new[] { s1, s2, s3 }.ToObservable();
var scheduler = new TestScheduler();
var observer = scheduler.CreateObserver<IList<string>>();
//Act
sources.Buffer(TimeSpan.FromMilliseconds(500), scheduler)
.Select(s => Observable.CombineLatest(s.Select(x => x.ObserveData("NoFilter"))))
.Switch()
.Select(x =>IntersectAll(x))
.Do(x => Console.WriteLine($"Recieved {string.Join("," , x)}"))
.Subscribe(observer);
scheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks);
//Assert
observer.Messages.AssertEqual(
OnNext<IList<string>>(0, s => s.SequenceEqual(expected)),
OnCompleted<IList<string>>(0));
对于 IntersectAll,请参阅 Intersection of multiple lists with IEnumerable.Intersect()
这个怎么样:
public IObservable<IEnumerable<string>> From(this IObservable<ISource> sources, string filter)
{
return sources
.Scan(Observable.Empty<IEnumerable<string>>(), (agg, source) => Observable.Merge(agg, source.ObserveData(filter)))
.Switch();
}
请注意,每次从 sources
发出新的源时,之前发出的所有源都会再次调用其 ObserveData
方法。因此,此解决方案的扩展性不是特别好,但确实满足您的 'If a new source is added then everything should re-evaluate' 要求
好的,第二次尝试,我很确定这就是您需要的(底部包含测试夹具):
public interface ISource
{
IObservable<IEnumerable<string>> ObserveData(string filter);
}
public static class ArbitrarySources
{
public static IObservable<IEnumerable<string>> Intersection(this IObservable<ISource> sourceObservable, string filter)
{
return sourceObservable
.SelectMany((source, index) => source.ObserveData(filter).Select(values => new { Index = index, Values = values }))
.Scan(ImmutableDictionary<int, IEnumerable<string>>.Empty, (agg, tuple) => agg.SetItem(tuple.Index, tuple.Values))
.Select(dictionary => dictionary.Values.Aggregate(Enumerable.Empty<string>(), (agg, values) => agg.Any() ? agg.Intersect(values) : values).ToArray());
}
}
public class IntersectionTest
{
internal class Source : ISource
{
private readonly IObservable<IEnumerable<string>> _observable;
public Source(IObservable<IEnumerable<string>> observable)
{
_observable = observable;
}
public IObservable<IEnumerable<string>> ObserveData(string filter)
{
return _observable;
}
}
[Fact]
public void ShouldIntersectValues()
{
TestScheduler scheduler = new TestScheduler();
var sourceA = new Source(scheduler.CreateColdObservable(
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b" })),
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b", "c" }))
));
var sourceB = new Source(scheduler.CreateColdObservable(
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "c" })),
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "b", "c" }))
));
var sources = scheduler.CreateColdObservable(
new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<ISource>(sourceA)),
new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext<ISource>(sourceB))
);
var observer = scheduler.Start(() => sources.Intersection("test"), 0, 0, TimeSpan.FromSeconds(6).Ticks);
IEnumerable<string>[] actual = observer.Messages
.Select(message => message.Value)
.Where(notification => notification.Kind == NotificationKind.OnNext && notification.HasValue)
.Select(notification => notification.Value)
.ToArray();
IEnumerable<string>[] expected = new []
{
new [] { "a", "b" },
new [] { "a" },
new [] { "a", "c" },
new [] { "b", "c" }
};
Assert.Equal(expected.Length, actual.Length);
foreach (var tuple in expected.Zip(actual, (e, a) => new { Expected = e, Actual = a }))
{
Assert.Equal(tuple.Expected, tuple.Actual);
}
}
}
这种方法的额外好处是在添加新源时不重新查询现有源,但每次任何源发出值时都会重新计算交集。
给定如下来源提供者:
IObservable<ISource> Sources();
每个 ISource 如下所示:
IObservable<IEnumerable<string>> ObserveData(string filter)
我想 return:
IObservable<IEnumerable<string>> Results
当从所有 ISources returned 给定字符串时。本质上我想要所有来源的交集。
如果添加了新的来源,那么一切都应该重新评估。
我正在努力想出一个通用的解决方案。我见过的大多数解决方案都有很多众所周知的来源。 任何想法表示赞赏。
回答 好的,经过一段时间的思考,我想出了我的答案。可能它可以改进,但它似乎对我有用,所以我将 post 放在这里以供参考,以防其他人遇到类似问题。感谢ibebbs和Shlomo抽空回复,不胜感激
//Arrange
var s1 = Substitute.For<ISource>();
s1.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "a", "b", "c", "d" }));
var s2 = Substitute.For<ISource>();
s2.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "b", "xx", "c", "d" }));
var s3 = Substitute.For<ISource>();
s3.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "yy", "b", "ff", "d" }));
var expected = new[] { "b", "d" };
var sources = new[] { s1, s2, s3 }.ToObservable();
var scheduler = new TestScheduler();
var observer = scheduler.CreateObserver<IList<string>>();
//Act
sources.Buffer(TimeSpan.FromMilliseconds(500), scheduler)
.Select(s => Observable.CombineLatest(s.Select(x => x.ObserveData("NoFilter"))))
.Switch()
.Select(x =>IntersectAll(x))
.Do(x => Console.WriteLine($"Recieved {string.Join("," , x)}"))
.Subscribe(observer);
scheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks);
//Assert
observer.Messages.AssertEqual(
OnNext<IList<string>>(0, s => s.SequenceEqual(expected)),
OnCompleted<IList<string>>(0));
对于 IntersectAll,请参阅 Intersection of multiple lists with IEnumerable.Intersect()
这个怎么样:
public IObservable<IEnumerable<string>> From(this IObservable<ISource> sources, string filter)
{
return sources
.Scan(Observable.Empty<IEnumerable<string>>(), (agg, source) => Observable.Merge(agg, source.ObserveData(filter)))
.Switch();
}
请注意,每次从 sources
发出新的源时,之前发出的所有源都会再次调用其 ObserveData
方法。因此,此解决方案的扩展性不是特别好,但确实满足您的 'If a new source is added then everything should re-evaluate' 要求
好的,第二次尝试,我很确定这就是您需要的(底部包含测试夹具):
public interface ISource
{
IObservable<IEnumerable<string>> ObserveData(string filter);
}
public static class ArbitrarySources
{
public static IObservable<IEnumerable<string>> Intersection(this IObservable<ISource> sourceObservable, string filter)
{
return sourceObservable
.SelectMany((source, index) => source.ObserveData(filter).Select(values => new { Index = index, Values = values }))
.Scan(ImmutableDictionary<int, IEnumerable<string>>.Empty, (agg, tuple) => agg.SetItem(tuple.Index, tuple.Values))
.Select(dictionary => dictionary.Values.Aggregate(Enumerable.Empty<string>(), (agg, values) => agg.Any() ? agg.Intersect(values) : values).ToArray());
}
}
public class IntersectionTest
{
internal class Source : ISource
{
private readonly IObservable<IEnumerable<string>> _observable;
public Source(IObservable<IEnumerable<string>> observable)
{
_observable = observable;
}
public IObservable<IEnumerable<string>> ObserveData(string filter)
{
return _observable;
}
}
[Fact]
public void ShouldIntersectValues()
{
TestScheduler scheduler = new TestScheduler();
var sourceA = new Source(scheduler.CreateColdObservable(
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b" })),
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b", "c" }))
));
var sourceB = new Source(scheduler.CreateColdObservable(
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "c" })),
new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "b", "c" }))
));
var sources = scheduler.CreateColdObservable(
new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<ISource>(sourceA)),
new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext<ISource>(sourceB))
);
var observer = scheduler.Start(() => sources.Intersection("test"), 0, 0, TimeSpan.FromSeconds(6).Ticks);
IEnumerable<string>[] actual = observer.Messages
.Select(message => message.Value)
.Where(notification => notification.Kind == NotificationKind.OnNext && notification.HasValue)
.Select(notification => notification.Value)
.ToArray();
IEnumerable<string>[] expected = new []
{
new [] { "a", "b" },
new [] { "a" },
new [] { "a", "c" },
new [] { "b", "c" }
};
Assert.Equal(expected.Length, actual.Length);
foreach (var tuple in expected.Zip(actual, (e, a) => new { Expected = e, Actual = a }))
{
Assert.Equal(tuple.Expected, tuple.Actual);
}
}
}
这种方法的额外好处是在添加新源时不重新查询现有源,但每次任何源发出值时都会重新计算交集。