使用 Rx 按键合并多个源
Using Rx to merge multiple sources by key
我对反应式扩展有点陌生,但由于我有一个数据流很重的问题,我假设它可以大大简化我的实现。但我的问题似乎比我预期的要复杂一些。
问题
我有多个数据源,它们都为同一个实体发出部分数据。例如,我有 datasource1
,它发出一个人的名字,datasource2
发出一个人的姓氏。这些数据的到来是完全无法预测的。
我现在需要做的是观察这两个来源,并使用某种运算符或主题,这使我可以等待两个来源观察对象。如果两个数据源 return 它们的特定部分,我只想继续。我的两个来源也传递了数据的密钥,因此可以 link 稍后将它们放在一起。
Reactive 中是否有内置的结构允许我这样做?还是反应式只是解决我问题的错误工具集?
我无法判断 Rx 或 async/await 或 TPL-Dataflow 是更好的解决方案,因为这可能取决于您的大型应用程序。一些可重现的代码真的会有帮助。
无论如何,这是一个 Rx 解决方案。我现在假设 datasource1
和 datasource2
是不同类型的可观察对象,或者很容易转换为不同类型的可观察对象。如果它们是同一类型的可观察对象,此解决方案也可以,但您也可以有其他选择:
var firstNameSource = new Subject<FirstNameMessage>();
var lastNameSource = new Subject<LastNameMessage>();
var timeout = TimeSpan.FromSeconds(1); //Set to length of time willing to wait
var join = firstNameSource.Join(lastNameSource,
fnm => Observable.Timer(timeout),
lnm => Observable.Timer(timeout),
(fnm, lnm) => new { FirstNameMessage = fnm, LastNameMessage = lnm }
)
.Where(a => a.FirstNameMessage.Id == a.LastNameMessage.Id)
.Select(a => Tuple.Create(a.FirstNameMessage.Name, a.LastNameMessage.Name))
.Timeout(timeout)
.Catch(Observable.Empty<Tuple<string, string>>());
使用这些示例 类:
public class FirstNameMessage
{
public int Id { get; set; }
public string Name { get; set; }
}
public class LastNameMessage
{
public int Id { get; set; }
public string Name { get; set; }
}
这是一些示例 subscription/execution 代码:
join.Subscribe(t => Console.WriteLine($"{t.Item1} {t.Item2}"), () => Console.WriteLine("No more names!"));
firstNameSource.OnNext(new FirstNameMessage{Id = 1, Name = "John" });
lastNameSource.OnNext(new LastNameMessage{Id = 1, Name = "Smith" });
lastNameSource.OnNext(new LastNameMessage { Id = 2, Name = "Jones" });
await Task.Delay(TimeSpan.FromMilliseconds(500));
firstNameSource.OnNext(new FirstNameMessage { Id = 2, Name = "Paul" });
firstNameSource.OnNext(new FirstNameMessage { Id = 3, Name = "Larry" });
await Task.Delay(TimeSpan.FromMilliseconds(1500));
lastNameSource.OnNext(new LastNameMessage { Id = 3, Name = "Fail" });
firstNameSource.OnNext(new FirstNameMessage { Id = 4, Name = "Won't Work" });
lastNameSource.OnNext(new LastNameMessage { Id = 4, Name = "Subscription terminated" });
解释:
此解决方案的关键部分是 Join 运算符。标准 DB/LINQ Join
通过键连接事物,而 Rx 的 Join
通过时间 window 连接。因此,上面的 Join
加入了彼此 timeout
时间跨度内的任何 FirstNameMessage
和 LastNameMessage
。由于我们也想通过键加入,这就是 Where
子句存在的原因。
末尾的 TimeOut
和 Catch
调用可能是多余的:它们只是用于终止订阅。听起来您的解决方案可能只是在等待一个值,而不是多个值,因此可能需要这样做。
我对反应式扩展有点陌生,但由于我有一个数据流很重的问题,我假设它可以大大简化我的实现。但我的问题似乎比我预期的要复杂一些。
问题
我有多个数据源,它们都为同一个实体发出部分数据。例如,我有 datasource1
,它发出一个人的名字,datasource2
发出一个人的姓氏。这些数据的到来是完全无法预测的。
我现在需要做的是观察这两个来源,并使用某种运算符或主题,这使我可以等待两个来源观察对象。如果两个数据源 return 它们的特定部分,我只想继续。我的两个来源也传递了数据的密钥,因此可以 link 稍后将它们放在一起。
Reactive 中是否有内置的结构允许我这样做?还是反应式只是解决我问题的错误工具集?
我无法判断 Rx 或 async/await 或 TPL-Dataflow 是更好的解决方案,因为这可能取决于您的大型应用程序。一些可重现的代码真的会有帮助。
无论如何,这是一个 Rx 解决方案。我现在假设 datasource1
和 datasource2
是不同类型的可观察对象,或者很容易转换为不同类型的可观察对象。如果它们是同一类型的可观察对象,此解决方案也可以,但您也可以有其他选择:
var firstNameSource = new Subject<FirstNameMessage>();
var lastNameSource = new Subject<LastNameMessage>();
var timeout = TimeSpan.FromSeconds(1); //Set to length of time willing to wait
var join = firstNameSource.Join(lastNameSource,
fnm => Observable.Timer(timeout),
lnm => Observable.Timer(timeout),
(fnm, lnm) => new { FirstNameMessage = fnm, LastNameMessage = lnm }
)
.Where(a => a.FirstNameMessage.Id == a.LastNameMessage.Id)
.Select(a => Tuple.Create(a.FirstNameMessage.Name, a.LastNameMessage.Name))
.Timeout(timeout)
.Catch(Observable.Empty<Tuple<string, string>>());
使用这些示例 类:
public class FirstNameMessage
{
public int Id { get; set; }
public string Name { get; set; }
}
public class LastNameMessage
{
public int Id { get; set; }
public string Name { get; set; }
}
这是一些示例 subscription/execution 代码:
join.Subscribe(t => Console.WriteLine($"{t.Item1} {t.Item2}"), () => Console.WriteLine("No more names!"));
firstNameSource.OnNext(new FirstNameMessage{Id = 1, Name = "John" });
lastNameSource.OnNext(new LastNameMessage{Id = 1, Name = "Smith" });
lastNameSource.OnNext(new LastNameMessage { Id = 2, Name = "Jones" });
await Task.Delay(TimeSpan.FromMilliseconds(500));
firstNameSource.OnNext(new FirstNameMessage { Id = 2, Name = "Paul" });
firstNameSource.OnNext(new FirstNameMessage { Id = 3, Name = "Larry" });
await Task.Delay(TimeSpan.FromMilliseconds(1500));
lastNameSource.OnNext(new LastNameMessage { Id = 3, Name = "Fail" });
firstNameSource.OnNext(new FirstNameMessage { Id = 4, Name = "Won't Work" });
lastNameSource.OnNext(new LastNameMessage { Id = 4, Name = "Subscription terminated" });
解释:
此解决方案的关键部分是 Join 运算符。标准 DB/LINQ Join
通过键连接事物,而 Rx 的 Join
通过时间 window 连接。因此,上面的 Join
加入了彼此 timeout
时间跨度内的任何 FirstNameMessage
和 LastNameMessage
。由于我们也想通过键加入,这就是 Where
子句存在的原因。
末尾的 TimeOut
和 Catch
调用可能是多余的:它们只是用于终止订阅。听起来您的解决方案可能只是在等待一个值,而不是多个值,因此可能需要这样做。