使用 Rx 按键合并多个源

Using Rx to merge multiple sources by key

我对反应式扩展有点陌生,但由于我有一个数据流很重的问题,我假设它可以大大简化我的实现。但我的问题似乎比我预期的要复杂一些。

问题

我有多个数据源,它们都为同一个实体发出部分数据。例如,我有 datasource1,它发出一个人的名字,datasource2 发出一个人的姓氏。这些数据的到来是完全无法预测的。

我现在需要做的是观察这两个来源,并使用某种运算符或主题,这使我可以等待两个来源观察对象。如果两个数据源 return 它们的特定部分,我只想继续。我的两个来源也传递了数据的密钥,因此可以 link 稍后将它们放在一起。

Reactive 中是否有内置的结构允许我这样做?还是反应式只是解决我问题的错误工具集?

我无法判断 Rx 或 async/await 或 TPL-Dataflow 是更好的解决方案,因为这可能取决于您的大型应用程序。一些可重现的代码真的会有帮助。

无论如何,这是一个 Rx 解决方案。我现在假设 datasource1datasource2 是不同类型的可观察对象,或者很容易转换为不同类型的可观察对象。如果它们是同一类型的可观察对象,此解决方案也可以,但您也可以有其他选择:

var firstNameSource = new Subject<FirstNameMessage>();
var lastNameSource = new Subject<LastNameMessage>();
var timeout = TimeSpan.FromSeconds(1); //Set to length of time willing to wait

var join = firstNameSource.Join(lastNameSource,
        fnm => Observable.Timer(timeout),
        lnm => Observable.Timer(timeout),
        (fnm, lnm) => new { FirstNameMessage = fnm, LastNameMessage = lnm }
    )
    .Where(a => a.FirstNameMessage.Id == a.LastNameMessage.Id)
    .Select(a => Tuple.Create(a.FirstNameMessage.Name, a.LastNameMessage.Name))
    .Timeout(timeout)
    .Catch(Observable.Empty<Tuple<string, string>>());

使用这些示例 类:

public class FirstNameMessage
{
    public int Id { get; set; }
    public string Name { get; set; }
}

public class LastNameMessage
{
    public int Id { get; set; }
    public string Name { get; set; }
}

这是一些示例 subscription/execution 代码:

join.Subscribe(t => Console.WriteLine($"{t.Item1} {t.Item2}"), () => Console.WriteLine("No more names!"));

firstNameSource.OnNext(new FirstNameMessage{Id = 1, Name = "John" });
lastNameSource.OnNext(new LastNameMessage{Id = 1, Name = "Smith" });

lastNameSource.OnNext(new LastNameMessage { Id = 2, Name = "Jones" });
await Task.Delay(TimeSpan.FromMilliseconds(500));
firstNameSource.OnNext(new FirstNameMessage { Id = 2, Name = "Paul" });

firstNameSource.OnNext(new FirstNameMessage { Id = 3, Name = "Larry" });
await Task.Delay(TimeSpan.FromMilliseconds(1500));
lastNameSource.OnNext(new LastNameMessage { Id = 3, Name = "Fail" });

firstNameSource.OnNext(new FirstNameMessage { Id = 4, Name = "Won't Work" });
lastNameSource.OnNext(new LastNameMessage { Id = 4, Name = "Subscription terminated" });

解释:

此解决方案的关键部分是 Join 运算符。标准 DB/LINQ Join 通过键连接事物,而 Rx 的 Join 通过时间 window 连接。因此,上面的 Join 加入了彼此 timeout 时间跨度内的任何 FirstNameMessageLastNameMessage。由于我们也想通过键加入,这就是 Where 子句存在的原因。

末尾的 TimeOutCatch 调用可能是多余的:它们只是用于终止订阅。听起来您的解决方案可能只是在等待一个值,而不是多个值,因此可能需要这样做。