缓冲具有可变超时的选定消息

Buffer selected messages with variable timeout

我有一个包含字母 (A-Z) 和数字 (1-9) 的流。我确实想加入在超时内到达的字母(这可以改变)并且总是立即发出数字。你能建议我哪些函数最适合做这个吗?

示例工作代码(不确定这是正确的 and/or 一个好的解决方案):

private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());

private IObservable<string> lettersJoined(IObservable<char> ob)
{
    return Observable.Create<string>(observer =>
    {
        var letters = new List<char>();
        var lettersFlush = new SerialDisposable();

        return ob.Subscribe(c =>
        {
            if (char.IsUpper(c))
            {

                if ((await sTimeouts.FirstAsync()).Ticks > 0)
                {
                    letters.Add(c);

                    lettersFlush.Disposable =
                        VariableTimeout(sTimeouts)
                        .Subscribe(x => {
                            observer.OnNext(String.Concat(letters));
                            letters.Clear();
                        });

                }
                else
                    observer.OnNext(letters.ToString());


            }
            else if (char.IsDigit(c))
                observer.OnNext(c.ToString());
        }

    }
}


private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
    return Observable.Create<long>(obs =>
    {
        var sd = new SerialDisposable();
        var first = DateTime.Now;

        return timeouts
            .Subscribe(timeout =>
            {
                if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
                {
                    sd.Disposable = null;
                    obs.OnNext(timeout.Ticks);
                    obs.OnCompleted();
                }
                else
                {
                    timeout -= DateTime.Now - first;

                    sd.Disposable =
                        Observable
                        .Timer(timeout)
                        .Subscribe(t => {
                            obs.OnNext(t);
                            obs.OnCompleted();
                        });
                }
            });

    });
}

private void ChangeTimeout(int timeout)
{
    sTimeouts.OnNext(timeout.ms())
}


// I use the following extension method
public static class TickExtensions
{
    public static TimeSpan ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms);
    }
}

要修改超时,我可以简单地更改私有超时变量,但如果 needed/better.

可能一个 Subject 就可以了

更新

var scheduler = new TestScheduler();

var timeout = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(0000.Ms(), 2000),
    ReactiveTest.OnNext(4300.Ms(), 1000));

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(1600.Ms(), '2'),
    ReactiveTest.OnNext(1900.Ms(), 'A'),
    ReactiveTest.OnNext(2100.Ms(), 'B'),
    ReactiveTest.OnNext(4500.Ms(), 'C'),
    ReactiveTest.OnNext(5100.Ms(), 'A'),
    ReactiveTest.OnNext(5500.Ms(), '5'),
    ReactiveTest.OnNext(6000.Ms(), 'B'),
    ReactiveTest.OnNext(7200.Ms(), '1'),
    ReactiveTest.OnNext(7500.Ms(), 'B'),
    ReactiveTest.OnNext(7700.Ms(), 'A'),
    ReactiveTest.OnNext(8400.Ms(), 'A'));

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"),
    ReactiveTest.OnNext(1600.Ms(), "2"),
    ReactiveTest.OnNext(4100.Ms(), "AB"),
    ReactiveTest.OnNext(5500.Ms(), "5"),
    ReactiveTest.OnNext(7000.Ms(), "CAB"),
    ReactiveTest.OnNext(7200.Ms(), "1"),
    ReactiveTest.OnNext(9400.Ms(), "BAA"));


// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")

更新 #2

完善的解决方案正确支持缓冲期间的超时更改

假设 sampleInput 作为样本输入:

var charStream = "12ABCAB1BAA".ToObservable();
var random = new Random();
var randomMilliTimings = Enumerable.Range(0, 12)
    .Select(i => random.Next(2000))
    .ToList();

var sampleInput = charStream
    .Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts)))
    .Select(t => Observable.Return(t.Item1).Delay(t.Item2))
    .Concat();

首先,与其更改可变变量,不如生成一些流来表示您的缓冲区 windows:

Input:  1---2--A-B----C--A-B-1--B-A--A
Window: ---------*--------*---------*--
Output: 1---2----AB-------CAB-1-----BAA

我生成了一个递增的 TimeSpan 流,并像这样调用它 bufferBoundaries 来演示:

var bufferBoundaries = Observable.Range(1, 20)
    .Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t)))
    .Concat();

这看起来像这样:

Seconds: 0--1--2--3--4--5--6--7--8--9--10
BB     : ---1-----2--------3-----------4-

... 接下来您要将 sampleInput 分成单独的字母和数字流,并相应地处理它们:

var letters = sampleInput
    .Where(c => char.IsLetter(c))
    .Buffer(bufferBoundaries)
    .Where(l => l.Any())
    .Select(lc => new string(lc.ToArray()));

var numbers = sampleInput
    .Where(c => char.IsNumber(c))
    .Select(c => c.ToString());

接下来,将两个流合并在一起:

var finalOutput = letters.Merge(numbers);

最后,如果可以的话,订阅同一个输入(在我们的例子中,sampleInput)两次通常不是一个好主意。因此,在我们的例子中,我们应该将 lettersnumbersfinalOutput 替换为以下内容:

var publishedFinal = sampleInput
    .Publish(_si => _si
        .Where(c => char.IsLetter(c))
        .Buffer(bufferBoundaries)
        .Where(l => l.Any())
        .Select(lc => new string(lc.ToArray()))
        .Merge( _si
            .Where(c => char.IsNumber(c))
            .Select(c => c.ToString())
        )
    );

这里有几件事可能会有所帮助。

第一个弹珠图非常适合帮助可视化问题,但在证明某事是否有效时,让我们使用 ITestableObservable<T> 个实例进行规范和单元测试。

其次,我不确定您的解决方案应该是什么。如果我看一下您的大理石图,我会发现一些差异。在这里我添加了一个时间轴来帮助可视化。

                 111111111122222222223
Time:   123456789012345678901234567890
Input:  1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA 

在这里,我看到了第 10 单元发布的 "AB" 输出。 然后我看到第 19 单元发布的 "CAB" 输出。 此外,我还看到了第 29 单元发布的 "BAA" 输出。 但是您建议这些应该在恒定的超时间隔内发生。 所以我认为这可能是重要的价值观之间的差距,但这似乎也没有加起来。这只是让我回到上面的观点,请提供一个可能通过或失败的单元测试。

第三,关于您的实施,您可以通过将 SerialDisposable 类型用于 lettersFlush 类型来稍微改善它。

为了帮助我设置单元测试,我创建了以下代码块

var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(0500.Ms(), '2'),
    ReactiveTest.OnNext(0800.Ms(), 'A'),
    ReactiveTest.OnNext(1000.Ms(), 'B'),
    ReactiveTest.OnNext(1500.Ms(), 'C'),
    ReactiveTest.OnNext(1800.Ms(), 'A'),
    ReactiveTest.OnNext(2000.Ms(), 'B'),
    ReactiveTest.OnNext(2200.Ms(), '1'),
    ReactiveTest.OnNext(2500.Ms(), 'B'),
    ReactiveTest.OnNext(2700.Ms(), 'A'),
    ReactiveTest.OnNext(3000.Ms(), 'A'));

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"),
    ReactiveTest.OnNext(0500.Ms(), "2"),
    ReactiveTest.OnNext(1000.Ms(), "AB"),
    ReactiveTest.OnNext(2000.Ms(), "CAB"),
    ReactiveTest.OnNext(2200.Ms(), "1"),
    ReactiveTest.OnNext(3000.Ms(), "BAA"));

我随意将一些值更改为我认为您的弹珠图所指的值。

如果我然后使用@Shlomo 上面提供的非常好的答案,我可以看到仅使用模糊大理石图的更多问题。由于缓冲区边界必须在要包含的最后一个值出现之后发生,因此这些 windows 需要关闭一次。

void Main()
{
    var scheduler = new TestScheduler();
    var input = scheduler.CreateColdObservable<char>(
        ReactiveTest.OnNext(0100.Ms(), '1'),
        ReactiveTest.OnNext(0500.Ms(), '2'),
        ReactiveTest.OnNext(0800.Ms(), 'A'),
        ReactiveTest.OnNext(1000.Ms(), 'B'),
        ReactiveTest.OnNext(1500.Ms(), 'C'),
        ReactiveTest.OnNext(1800.Ms(), 'A'),
        ReactiveTest.OnNext(2000.Ms(), 'B'),
        ReactiveTest.OnNext(2200.Ms(), '1'),
        ReactiveTest.OnNext(2500.Ms(), 'B'),
        ReactiveTest.OnNext(2700.Ms(), 'A'),
        ReactiveTest.OnNext(3000.Ms(), 'A'));

    var expected = scheduler.CreateColdObservable<string>(
        ReactiveTest.OnNext(0100.Ms(), "1"),
        ReactiveTest.OnNext(0500.Ms(), "2"),
        ReactiveTest.OnNext(1000.Ms()+1, "AB"),
        ReactiveTest.OnNext(2000.Ms()+1, "CAB"),
        ReactiveTest.OnNext(2200.Ms(), "1"),
        ReactiveTest.OnNext(3000.Ms()+1, "BAA"));

    /*
                     111111111122222222223
    Time:   123456789012345678901234567890
    Input:  1---2--A-B----C--A-B-1--B-A--A
    Output: 1---2----AB-------CAB-1-----BAA 
    */

    var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler);
            //Move to a hot test sequence to force the windows to close just after the values are produced
            scheduler.CreateHotObservable<Unit>(
        ReactiveTest.OnNext(1000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(2000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(3000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(4000.Ms()+1, Unit.Default));

    var publishedFinal = input
        .Publish(i => i
            .Where(c => char.IsLetter(c))
            .Buffer(bufferBoundaries)
            .Where(l => l.Any())
            .Select(lc => new string(lc.ToArray()))
            .Merge(i
                .Where(c => char.IsNumber(c))
                .Select(c => c.ToString())
            )
        );

    var observer = scheduler.CreateObserver<string>();

    publishedFinal.Subscribe(observer);
    scheduler.Start();

    //This test passes with the "+1" values hacked in.
    ReactiveAssert.AreElementsEqual(
        expected.Messages,
        observer.Messages);

}

// Define other methods and classes here
public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

我想我的观点是 Rx 是确定性的,因此我们可以创建确定性的测试。因此,虽然您的问题非常好,而且我相信@Shlomo 提供了可靠的最终答案,但我们可以做得更好,而不仅仅是模糊大理石图和在我们的 examples/tests 中使用 Random。 在这里保持精确应该有助于防止生产中出现愚蠢的竞争条件,并帮助 reader 更好地理解这些解决方案。