使用反应式扩展的心跳模式
Heartbeat pattern using reactive extension
给定一个简单的场景:
A和B在一个房间里,A和B说话。房间很黑,B看不到A。B怎么知道是A在暂停还是A被从房间里绑架了?
当 A 说话时,A 提供 IObservable Talk,B 随后订阅 Talk.Subscribe(string=>process what A said)。 B 可以同时订阅 Observable.Interval Heartbeat 作为心跳检查。
我的问题是我应该对 merge/combine 两个 IObservable 使用什么运算符,这样如果 Talk 中没有项目超过 Heartbeat 的两个项目,B会认为A被绑架了
请注意,我想避免使用变量来存储状态,因为如果我没有正确同步该变量,它可能会导致副作用。
谢谢,
想象一个你想要操作的状态变量,状态代表自 'A' 上次发言以来的心跳次数。看起来像这样:
var stateObservable = Observable.Merge( //State represent number of heartbeats since A last spoke
aSource.Select(_ => new Func<int, int>(i => 0)), //When a talks, set state to 0
bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) //when b heartbeats, increment state
)
.Scan(0, (state, func) => func(state));
我们将 A 说话的事件表示为将状态重置为 0 的函数,将 B 心跳事件表示为递增状态。然后我们用Scan
函数累加。
剩下的就简单了:
var isKidnapped = stateObservable
.Where(state => state >= 2)
.Take(1);
isKidnapped.Subscribe(_ => Console.WriteLine("A is kidnapped"));
编辑:
这是一个 n A 来源的例子:
var aSources = new Subject<Tuple<string, Subject<string>>>();
var bHeartbeat = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
var stateObservable = aSources.SelectMany(t =>
Observable.Merge(
t.Item2.Select(_ => new Func<int, int>(i => 0)),
bHeartbeat.Select(_ => new Func<int, int>(i => i + 1))
)
.Scan(0, (state, func) => func(state))
.Where(state => state >= 2)
.Take(1)
.Select(_ => t.Item1)
);
stateObservable.Subscribe(s => Console.WriteLine($"{s} is kidnapped"));
aSources
.SelectMany(t => t.Item2.Select(s => Tuple.Create(t.Item1, s)))
.Subscribe(t => Console.WriteLine($"{t.Item1} says '{t.Item2}'"));
bHeartbeat.Subscribe(_ => Console.WriteLine("**Heartbeat**"));
var a = new Subject<string>();
var c = new Subject<string>();
var d = new Subject<string>();
var e = new Subject<string>();
var f = new Subject<string>();
aSources.OnNext(Tuple.Create("A", a));
aSources.OnNext(Tuple.Create("C", c));
aSources.OnNext(Tuple.Create("D", d));
aSources.OnNext(Tuple.Create("E", e));
aSources.OnNext(Tuple.Create("F", f));
a.OnNext("Hello");
c.OnNext("My name is C");
d.OnNext("D is for Dog");
await Task.Delay(TimeSpan.FromMilliseconds(1200));
e.OnNext("Easy-E here");
a.OnNext("A is for Apple");
await Task.Delay(TimeSpan.FromMilliseconds(2200));
给定一个简单的场景:
A和B在一个房间里,A和B说话。房间很黑,B看不到A。B怎么知道是A在暂停还是A被从房间里绑架了?
当 A 说话时,A 提供 IObservable Talk,B 随后订阅 Talk.Subscribe(string=>process what A said)。 B 可以同时订阅 Observable.Interval Heartbeat 作为心跳检查。
我的问题是我应该对 merge/combine 两个 IObservable 使用什么运算符,这样如果 Talk 中没有项目超过 Heartbeat 的两个项目,B会认为A被绑架了
请注意,我想避免使用变量来存储状态,因为如果我没有正确同步该变量,它可能会导致副作用。
谢谢,
想象一个你想要操作的状态变量,状态代表自 'A' 上次发言以来的心跳次数。看起来像这样:
var stateObservable = Observable.Merge( //State represent number of heartbeats since A last spoke
aSource.Select(_ => new Func<int, int>(i => 0)), //When a talks, set state to 0
bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) //when b heartbeats, increment state
)
.Scan(0, (state, func) => func(state));
我们将 A 说话的事件表示为将状态重置为 0 的函数,将 B 心跳事件表示为递增状态。然后我们用Scan
函数累加。
剩下的就简单了:
var isKidnapped = stateObservable
.Where(state => state >= 2)
.Take(1);
isKidnapped.Subscribe(_ => Console.WriteLine("A is kidnapped"));
编辑:
这是一个 n A 来源的例子:
var aSources = new Subject<Tuple<string, Subject<string>>>();
var bHeartbeat = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
var stateObservable = aSources.SelectMany(t =>
Observable.Merge(
t.Item2.Select(_ => new Func<int, int>(i => 0)),
bHeartbeat.Select(_ => new Func<int, int>(i => i + 1))
)
.Scan(0, (state, func) => func(state))
.Where(state => state >= 2)
.Take(1)
.Select(_ => t.Item1)
);
stateObservable.Subscribe(s => Console.WriteLine($"{s} is kidnapped"));
aSources
.SelectMany(t => t.Item2.Select(s => Tuple.Create(t.Item1, s)))
.Subscribe(t => Console.WriteLine($"{t.Item1} says '{t.Item2}'"));
bHeartbeat.Subscribe(_ => Console.WriteLine("**Heartbeat**"));
var a = new Subject<string>();
var c = new Subject<string>();
var d = new Subject<string>();
var e = new Subject<string>();
var f = new Subject<string>();
aSources.OnNext(Tuple.Create("A", a));
aSources.OnNext(Tuple.Create("C", c));
aSources.OnNext(Tuple.Create("D", d));
aSources.OnNext(Tuple.Create("E", e));
aSources.OnNext(Tuple.Create("F", f));
a.OnNext("Hello");
c.OnNext("My name is C");
d.OnNext("D is for Dog");
await Task.Delay(TimeSpan.FromMilliseconds(1200));
e.OnNext("Easy-E here");
a.OnNext("A is for Apple");
await Task.Delay(TimeSpan.FromMilliseconds(2200));