如何匹配两个因果流中的事件以检测过度滞后
How to match events in two causal streams to detect excessive lagging
我有两个热 observable,分别是对网络服务器的请求流 Q
和来自服务器的回复流 R
。回复总是按照请求的顺序发送,每个请求最终 都会收到一个回复。因此 R
、R1 中的第一个事件是对 Q
、Q1,依此类推。我需要检测何时回复 Rn 花费的时间比定义的超时时间长,并发出此超时条件的信号。
Q --1---2---------3-------> // Requests Q1, Q2...
R ----1-------------------> // Replies
Out ------------------O-|> // Oops: Reply R2 to Q2 did not arrive within time τ.
|<----τ---->|
事件 Qn 和 Rn 不包含任何识别信息(想想普通的无色圆形弹珠),以及图中的索引只是为了解释而引入的序号。
我好像解不开这个谜语。我尝试了下面的方法,但似乎我匹配 the latest request Qi to 最新的响应Rj。在示例 Q
中包含 5 个请求,间隔 500 毫秒,R
中的回复间隔 750 毫秒,从 200 毫秒开始,但只有 4 个(第 5 个无限期延迟)。代码没有检测到这一点,因为最后一个回复 R4 在 latest 请求 Q 之后的 1000 毫秒设置超时内5(实际上是 200 毫秒)。
var Q = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(_ => Unit.Default)
.Take(5).Concat(Observable.Never<Unit>());
var R = Observable.Interval(TimeSpan.FromMilliseconds(750)).Select(_ => Unit.Default)
.Delay(TimeSpan.FromMilliseconds(200))
.Take(4).Concat(Observable.Never<Unit>());
var dq = Q.Select(v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(1000)));
var dr = Observable.Zip(Q, R, (_1,_2) => Observable.Never<Unit>());
Observable.Merge(dq, dr).Dump().Switch().Dump();
我相信您希望收到请求 4 已超时(3 秒到期,但 3.2 秒到达)以及请求 5 从未到达的通知
void Main()
{
var scheduler = new TestScheduler();
var requests = scheduler.CreateHotObservable<int>(
ReactiveTest.OnNext(0500.Ms(), 1),
ReactiveTest.OnNext(1000.Ms(), 2),
ReactiveTest.OnNext(1500.Ms(), 3),
ReactiveTest.OnNext(2000.Ms(), 4),
ReactiveTest.OnNext(2500.Ms(), 5));
var responses = scheduler.CreateHotObservable<Unit>(
ReactiveTest.OnNext(0950.Ms(), Unit.Default),
ReactiveTest.OnNext(1700.Ms(), Unit.Default),
ReactiveTest.OnNext(2450.Ms(), Unit.Default),
ReactiveTest.OnNext(3200.Ms(), Unit.Default));
var expected = scheduler.CreateHotObservable<int>(
ReactiveTest.OnNext(3000.Ms(), 4),
ReactiveTest.OnNext(3500.Ms(), 5)
);
var observer = scheduler.CreateObserver<int>();
var query = responses
.Select((val, idx)=>idx)
.Publish(responseIdxs =>
{
return requests.SelectMany((q, qIdx) =>
Observable.Timer(TimeSpan.FromSeconds(1), scheduler)
.TakeUntil(responseIdxs.Where(rIdx => qIdx == rIdx))
.Select(_ => q));
});
query.Subscribe(observer);
scheduler.Start();
//This test passes
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
}
// Define other methods and classes here
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
我有两个热 observable,分别是对网络服务器的请求流 Q
和来自服务器的回复流 R
。回复总是按照请求的顺序发送,每个请求最终 都会收到一个回复。因此 R
、R1 中的第一个事件是对 Q
、Q1,依此类推。我需要检测何时回复 Rn 花费的时间比定义的超时时间长,并发出此超时条件的信号。
Q --1---2---------3-------> // Requests Q1, Q2...
R ----1-------------------> // Replies
Out ------------------O-|> // Oops: Reply R2 to Q2 did not arrive within time τ.
|<----τ---->|
事件 Qn 和 Rn 不包含任何识别信息(想想普通的无色圆形弹珠),以及图中的索引只是为了解释而引入的序号。
我好像解不开这个谜语。我尝试了下面的方法,但似乎我匹配 the latest request Qi to 最新的响应Rj。在示例 Q
中包含 5 个请求,间隔 500 毫秒,R
中的回复间隔 750 毫秒,从 200 毫秒开始,但只有 4 个(第 5 个无限期延迟)。代码没有检测到这一点,因为最后一个回复 R4 在 latest 请求 Q 之后的 1000 毫秒设置超时内5(实际上是 200 毫秒)。
var Q = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(_ => Unit.Default)
.Take(5).Concat(Observable.Never<Unit>());
var R = Observable.Interval(TimeSpan.FromMilliseconds(750)).Select(_ => Unit.Default)
.Delay(TimeSpan.FromMilliseconds(200))
.Take(4).Concat(Observable.Never<Unit>());
var dq = Q.Select(v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(1000)));
var dr = Observable.Zip(Q, R, (_1,_2) => Observable.Never<Unit>());
Observable.Merge(dq, dr).Dump().Switch().Dump();
我相信您希望收到请求 4 已超时(3 秒到期,但 3.2 秒到达)以及请求 5 从未到达的通知
void Main()
{
var scheduler = new TestScheduler();
var requests = scheduler.CreateHotObservable<int>(
ReactiveTest.OnNext(0500.Ms(), 1),
ReactiveTest.OnNext(1000.Ms(), 2),
ReactiveTest.OnNext(1500.Ms(), 3),
ReactiveTest.OnNext(2000.Ms(), 4),
ReactiveTest.OnNext(2500.Ms(), 5));
var responses = scheduler.CreateHotObservable<Unit>(
ReactiveTest.OnNext(0950.Ms(), Unit.Default),
ReactiveTest.OnNext(1700.Ms(), Unit.Default),
ReactiveTest.OnNext(2450.Ms(), Unit.Default),
ReactiveTest.OnNext(3200.Ms(), Unit.Default));
var expected = scheduler.CreateHotObservable<int>(
ReactiveTest.OnNext(3000.Ms(), 4),
ReactiveTest.OnNext(3500.Ms(), 5)
);
var observer = scheduler.CreateObserver<int>();
var query = responses
.Select((val, idx)=>idx)
.Publish(responseIdxs =>
{
return requests.SelectMany((q, qIdx) =>
Observable.Timer(TimeSpan.FromSeconds(1), scheduler)
.TakeUntil(responseIdxs.Where(rIdx => qIdx == rIdx))
.Select(_ => q));
});
query.Subscribe(observer);
scheduler.Start();
//This test passes
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
}
// Define other methods and classes here
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}