缓冲具有可变超时的选定消息
Buffer selected messages with variable timeout
我有一个包含字母 (A-Z) 和数字 (1-9) 的流。我确实想加入在超时内到达的字母(这可以改变)并且总是立即发出数字。你能建议我哪些函数最适合做这个吗?
示例工作代码(不确定这是正确的 and/or 一个好的解决方案):
private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());
private IObservable<string> lettersJoined(IObservable<char> ob)
{
return Observable.Create<string>(observer =>
{
var letters = new List<char>();
var lettersFlush = new SerialDisposable();
return ob.Subscribe(c =>
{
if (char.IsUpper(c))
{
if ((await sTimeouts.FirstAsync()).Ticks > 0)
{
letters.Add(c);
lettersFlush.Disposable =
VariableTimeout(sTimeouts)
.Subscribe(x => {
observer.OnNext(String.Concat(letters));
letters.Clear();
});
}
else
observer.OnNext(letters.ToString());
}
else if (char.IsDigit(c))
observer.OnNext(c.ToString());
}
}
}
private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
return Observable.Create<long>(obs =>
{
var sd = new SerialDisposable();
var first = DateTime.Now;
return timeouts
.Subscribe(timeout =>
{
if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
{
sd.Disposable = null;
obs.OnNext(timeout.Ticks);
obs.OnCompleted();
}
else
{
timeout -= DateTime.Now - first;
sd.Disposable =
Observable
.Timer(timeout)
.Subscribe(t => {
obs.OnNext(t);
obs.OnCompleted();
});
}
});
});
}
private void ChangeTimeout(int timeout)
{
sTimeouts.OnNext(timeout.ms())
}
// I use the following extension method
public static class TickExtensions
{
public static TimeSpan ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms);
}
}
要修改超时,我可以简单地更改私有超时变量,但如果 needed/better.
可能一个 Subject 就可以了
更新
var scheduler = new TestScheduler();
var timeout = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(0000.Ms(), 2000),
ReactiveTest.OnNext(4300.Ms(), 1000));
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(1600.Ms(), '2'),
ReactiveTest.OnNext(1900.Ms(), 'A'),
ReactiveTest.OnNext(2100.Ms(), 'B'),
ReactiveTest.OnNext(4500.Ms(), 'C'),
ReactiveTest.OnNext(5100.Ms(), 'A'),
ReactiveTest.OnNext(5500.Ms(), '5'),
ReactiveTest.OnNext(6000.Ms(), 'B'),
ReactiveTest.OnNext(7200.Ms(), '1'),
ReactiveTest.OnNext(7500.Ms(), 'B'),
ReactiveTest.OnNext(7700.Ms(), 'A'),
ReactiveTest.OnNext(8400.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(1600.Ms(), "2"),
ReactiveTest.OnNext(4100.Ms(), "AB"),
ReactiveTest.OnNext(5500.Ms(), "5"),
ReactiveTest.OnNext(7000.Ms(), "CAB"),
ReactiveTest.OnNext(7200.Ms(), "1"),
ReactiveTest.OnNext(9400.Ms(), "BAA"));
// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")
更新 #2
完善的解决方案正确支持缓冲期间的超时更改
假设 sampleInput
作为样本输入:
var charStream = "12ABCAB1BAA".ToObservable();
var random = new Random();
var randomMilliTimings = Enumerable.Range(0, 12)
.Select(i => random.Next(2000))
.ToList();
var sampleInput = charStream
.Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts)))
.Select(t => Observable.Return(t.Item1).Delay(t.Item2))
.Concat();
首先,与其更改可变变量,不如生成一些流来表示您的缓冲区 windows:
Input: 1---2--A-B----C--A-B-1--B-A--A
Window: ---------*--------*---------*--
Output: 1---2----AB-------CAB-1-----BAA
我生成了一个递增的 TimeSpan
流,并像这样调用它 bufferBoundaries
来演示:
var bufferBoundaries = Observable.Range(1, 20)
.Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t)))
.Concat();
这看起来像这样:
Seconds: 0--1--2--3--4--5--6--7--8--9--10
BB : ---1-----2--------3-----------4-
... 接下来您要将 sampleInput
分成单独的字母和数字流,并相应地处理它们:
var letters = sampleInput
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()));
var numbers = sampleInput
.Where(c => char.IsNumber(c))
.Select(c => c.ToString());
接下来,将两个流合并在一起:
var finalOutput = letters.Merge(numbers);
最后,如果可以的话,订阅同一个输入(在我们的例子中,sampleInput
)两次通常不是一个好主意。因此,在我们的例子中,我们应该将 letters
、numbers
和 finalOutput
替换为以下内容:
var publishedFinal = sampleInput
.Publish(_si => _si
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()))
.Merge( _si
.Where(c => char.IsNumber(c))
.Select(c => c.ToString())
)
);
这里有几件事可能会有所帮助。
第一个弹珠图非常适合帮助可视化问题,但在证明某事是否有效时,让我们使用 ITestableObservable<T>
个实例进行规范和单元测试。
其次,我不确定您的解决方案应该是什么。如果我看一下您的大理石图,我会发现一些差异。在这里我添加了一个时间轴来帮助可视化。
111111111122222222223
Time: 123456789012345678901234567890
Input: 1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA
在这里,我看到了第 10 单元发布的 "AB" 输出。
然后我看到第 19 单元发布的 "CAB" 输出。
此外,我还看到了第 29 单元发布的 "BAA" 输出。
但是您建议这些应该在恒定的超时间隔内发生。
所以我认为这可能是重要的价值观之间的差距,但这似乎也没有加起来。这只是让我回到上面的观点,请提供一个可能通过或失败的单元测试。
第三,关于您的实施,您可以通过将 SerialDisposable
类型用于 lettersFlush
类型来稍微改善它。
为了帮助我设置单元测试,我创建了以下代码块
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0500.Ms(), '2'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(1000.Ms(), 'B'),
ReactiveTest.OnNext(1500.Ms(), 'C'),
ReactiveTest.OnNext(1800.Ms(), 'A'),
ReactiveTest.OnNext(2000.Ms(), 'B'),
ReactiveTest.OnNext(2200.Ms(), '1'),
ReactiveTest.OnNext(2500.Ms(), 'B'),
ReactiveTest.OnNext(2700.Ms(), 'A'),
ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(0500.Ms(), "2"),
ReactiveTest.OnNext(1000.Ms(), "AB"),
ReactiveTest.OnNext(2000.Ms(), "CAB"),
ReactiveTest.OnNext(2200.Ms(), "1"),
ReactiveTest.OnNext(3000.Ms(), "BAA"));
我随意将一些值更改为我认为您的弹珠图所指的值。
如果我然后使用@Shlomo 上面提供的非常好的答案,我可以看到仅使用模糊大理石图的更多问题。由于缓冲区边界必须在要包含的最后一个值出现之后发生,因此这些 windows 需要关闭一次。
void Main()
{
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0500.Ms(), '2'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(1000.Ms(), 'B'),
ReactiveTest.OnNext(1500.Ms(), 'C'),
ReactiveTest.OnNext(1800.Ms(), 'A'),
ReactiveTest.OnNext(2000.Ms(), 'B'),
ReactiveTest.OnNext(2200.Ms(), '1'),
ReactiveTest.OnNext(2500.Ms(), 'B'),
ReactiveTest.OnNext(2700.Ms(), 'A'),
ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(0500.Ms(), "2"),
ReactiveTest.OnNext(1000.Ms()+1, "AB"),
ReactiveTest.OnNext(2000.Ms()+1, "CAB"),
ReactiveTest.OnNext(2200.Ms(), "1"),
ReactiveTest.OnNext(3000.Ms()+1, "BAA"));
/*
111111111122222222223
Time: 123456789012345678901234567890
Input: 1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA
*/
var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler);
//Move to a hot test sequence to force the windows to close just after the values are produced
scheduler.CreateHotObservable<Unit>(
ReactiveTest.OnNext(1000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(2000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(3000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(4000.Ms()+1, Unit.Default));
var publishedFinal = input
.Publish(i => i
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()))
.Merge(i
.Where(c => char.IsNumber(c))
.Select(c => c.ToString())
)
);
var observer = scheduler.CreateObserver<string>();
publishedFinal.Subscribe(observer);
scheduler.Start();
//This test passes with the "+1" values hacked in.
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
}
// Define other methods and classes here
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
我想我的观点是 Rx 是确定性的,因此我们可以创建确定性的测试。因此,虽然您的问题非常好,而且我相信@Shlomo 提供了可靠的最终答案,但我们可以做得更好,而不仅仅是模糊大理石图和在我们的 examples/tests 中使用 Random
。
在这里保持精确应该有助于防止生产中出现愚蠢的竞争条件,并帮助 reader 更好地理解这些解决方案。
我有一个包含字母 (A-Z) 和数字 (1-9) 的流。我确实想加入在超时内到达的字母(这可以改变)并且总是立即发出数字。你能建议我哪些函数最适合做这个吗?
示例工作代码(不确定这是正确的 and/or 一个好的解决方案):
private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());
private IObservable<string> lettersJoined(IObservable<char> ob)
{
return Observable.Create<string>(observer =>
{
var letters = new List<char>();
var lettersFlush = new SerialDisposable();
return ob.Subscribe(c =>
{
if (char.IsUpper(c))
{
if ((await sTimeouts.FirstAsync()).Ticks > 0)
{
letters.Add(c);
lettersFlush.Disposable =
VariableTimeout(sTimeouts)
.Subscribe(x => {
observer.OnNext(String.Concat(letters));
letters.Clear();
});
}
else
observer.OnNext(letters.ToString());
}
else if (char.IsDigit(c))
observer.OnNext(c.ToString());
}
}
}
private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
return Observable.Create<long>(obs =>
{
var sd = new SerialDisposable();
var first = DateTime.Now;
return timeouts
.Subscribe(timeout =>
{
if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
{
sd.Disposable = null;
obs.OnNext(timeout.Ticks);
obs.OnCompleted();
}
else
{
timeout -= DateTime.Now - first;
sd.Disposable =
Observable
.Timer(timeout)
.Subscribe(t => {
obs.OnNext(t);
obs.OnCompleted();
});
}
});
});
}
private void ChangeTimeout(int timeout)
{
sTimeouts.OnNext(timeout.ms())
}
// I use the following extension method
public static class TickExtensions
{
public static TimeSpan ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms);
}
}
要修改超时,我可以简单地更改私有超时变量,但如果 needed/better.
可能一个 Subject 就可以了更新
var scheduler = new TestScheduler();
var timeout = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(0000.Ms(), 2000),
ReactiveTest.OnNext(4300.Ms(), 1000));
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(1600.Ms(), '2'),
ReactiveTest.OnNext(1900.Ms(), 'A'),
ReactiveTest.OnNext(2100.Ms(), 'B'),
ReactiveTest.OnNext(4500.Ms(), 'C'),
ReactiveTest.OnNext(5100.Ms(), 'A'),
ReactiveTest.OnNext(5500.Ms(), '5'),
ReactiveTest.OnNext(6000.Ms(), 'B'),
ReactiveTest.OnNext(7200.Ms(), '1'),
ReactiveTest.OnNext(7500.Ms(), 'B'),
ReactiveTest.OnNext(7700.Ms(), 'A'),
ReactiveTest.OnNext(8400.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(1600.Ms(), "2"),
ReactiveTest.OnNext(4100.Ms(), "AB"),
ReactiveTest.OnNext(5500.Ms(), "5"),
ReactiveTest.OnNext(7000.Ms(), "CAB"),
ReactiveTest.OnNext(7200.Ms(), "1"),
ReactiveTest.OnNext(9400.Ms(), "BAA"));
// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")
更新 #2
完善的解决方案正确支持缓冲期间的超时更改
假设 sampleInput
作为样本输入:
var charStream = "12ABCAB1BAA".ToObservable();
var random = new Random();
var randomMilliTimings = Enumerable.Range(0, 12)
.Select(i => random.Next(2000))
.ToList();
var sampleInput = charStream
.Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts)))
.Select(t => Observable.Return(t.Item1).Delay(t.Item2))
.Concat();
首先,与其更改可变变量,不如生成一些流来表示您的缓冲区 windows:
Input: 1---2--A-B----C--A-B-1--B-A--A
Window: ---------*--------*---------*--
Output: 1---2----AB-------CAB-1-----BAA
我生成了一个递增的 TimeSpan
流,并像这样调用它 bufferBoundaries
来演示:
var bufferBoundaries = Observable.Range(1, 20)
.Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t)))
.Concat();
这看起来像这样:
Seconds: 0--1--2--3--4--5--6--7--8--9--10
BB : ---1-----2--------3-----------4-
... 接下来您要将 sampleInput
分成单独的字母和数字流,并相应地处理它们:
var letters = sampleInput
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()));
var numbers = sampleInput
.Where(c => char.IsNumber(c))
.Select(c => c.ToString());
接下来,将两个流合并在一起:
var finalOutput = letters.Merge(numbers);
最后,如果可以的话,订阅同一个输入(在我们的例子中,sampleInput
)两次通常不是一个好主意。因此,在我们的例子中,我们应该将 letters
、numbers
和 finalOutput
替换为以下内容:
var publishedFinal = sampleInput
.Publish(_si => _si
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()))
.Merge( _si
.Where(c => char.IsNumber(c))
.Select(c => c.ToString())
)
);
这里有几件事可能会有所帮助。
第一个弹珠图非常适合帮助可视化问题,但在证明某事是否有效时,让我们使用 ITestableObservable<T>
个实例进行规范和单元测试。
其次,我不确定您的解决方案应该是什么。如果我看一下您的大理石图,我会发现一些差异。在这里我添加了一个时间轴来帮助可视化。
111111111122222222223
Time: 123456789012345678901234567890
Input: 1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA
在这里,我看到了第 10 单元发布的 "AB" 输出。 然后我看到第 19 单元发布的 "CAB" 输出。 此外,我还看到了第 29 单元发布的 "BAA" 输出。 但是您建议这些应该在恒定的超时间隔内发生。 所以我认为这可能是重要的价值观之间的差距,但这似乎也没有加起来。这只是让我回到上面的观点,请提供一个可能通过或失败的单元测试。
第三,关于您的实施,您可以通过将 SerialDisposable
类型用于 lettersFlush
类型来稍微改善它。
为了帮助我设置单元测试,我创建了以下代码块
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0500.Ms(), '2'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(1000.Ms(), 'B'),
ReactiveTest.OnNext(1500.Ms(), 'C'),
ReactiveTest.OnNext(1800.Ms(), 'A'),
ReactiveTest.OnNext(2000.Ms(), 'B'),
ReactiveTest.OnNext(2200.Ms(), '1'),
ReactiveTest.OnNext(2500.Ms(), 'B'),
ReactiveTest.OnNext(2700.Ms(), 'A'),
ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(0500.Ms(), "2"),
ReactiveTest.OnNext(1000.Ms(), "AB"),
ReactiveTest.OnNext(2000.Ms(), "CAB"),
ReactiveTest.OnNext(2200.Ms(), "1"),
ReactiveTest.OnNext(3000.Ms(), "BAA"));
我随意将一些值更改为我认为您的弹珠图所指的值。
如果我然后使用@Shlomo 上面提供的非常好的答案,我可以看到仅使用模糊大理石图的更多问题。由于缓冲区边界必须在要包含的最后一个值出现之后发生,因此这些 windows 需要关闭一次。
void Main()
{
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0500.Ms(), '2'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(1000.Ms(), 'B'),
ReactiveTest.OnNext(1500.Ms(), 'C'),
ReactiveTest.OnNext(1800.Ms(), 'A'),
ReactiveTest.OnNext(2000.Ms(), 'B'),
ReactiveTest.OnNext(2200.Ms(), '1'),
ReactiveTest.OnNext(2500.Ms(), 'B'),
ReactiveTest.OnNext(2700.Ms(), 'A'),
ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(0500.Ms(), "2"),
ReactiveTest.OnNext(1000.Ms()+1, "AB"),
ReactiveTest.OnNext(2000.Ms()+1, "CAB"),
ReactiveTest.OnNext(2200.Ms(), "1"),
ReactiveTest.OnNext(3000.Ms()+1, "BAA"));
/*
111111111122222222223
Time: 123456789012345678901234567890
Input: 1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA
*/
var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler);
//Move to a hot test sequence to force the windows to close just after the values are produced
scheduler.CreateHotObservable<Unit>(
ReactiveTest.OnNext(1000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(2000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(3000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(4000.Ms()+1, Unit.Default));
var publishedFinal = input
.Publish(i => i
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()))
.Merge(i
.Where(c => char.IsNumber(c))
.Select(c => c.ToString())
)
);
var observer = scheduler.CreateObserver<string>();
publishedFinal.Subscribe(observer);
scheduler.Start();
//This test passes with the "+1" values hacked in.
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
}
// Define other methods and classes here
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
我想我的观点是 Rx 是确定性的,因此我们可以创建确定性的测试。因此,虽然您的问题非常好,而且我相信@Shlomo 提供了可靠的最终答案,但我们可以做得更好,而不仅仅是模糊大理石图和在我们的 examples/tests 中使用 Random
。
在这里保持精确应该有助于防止生产中出现愚蠢的竞争条件,并帮助 reader 更好地理解这些解决方案。