使用 rx 订阅事件并在时间间隔后执行日志记录

Using rx to subscribe to event and perform logging after time interval

我有一个简单的用例:

  1. 接收事件通知
  2. 对事件执行一些操作
  3. 打印x间隔后的内容

如何在单个 Rx 管道中执行上述步骤?

如下所示:

void Main()
{
    var observable = Observable.Interval(TimeSpan.FromSeconds(1));

    // Receive event and call Foo()
    observable.Subscribe(x=>Foo());
    // After 1 minute, I want to print the result of count
    // How do I do this using above observable?
}

int count = 0;
void Foo()
{
    Console.Write(".");
    count ++;
}

我认为这符合您的要求:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Do(x => Foo())
        .Window(() => Observable.Timer(TimeSpan.FromMinutes(1.0)));

var subscription =
    observable
        .Subscribe(xs => Console.WriteLine(count));

但是,将状态与可观察对象混合是个坏主意。如果您有两个订阅,您会以两倍的速度递增 count。最好将您的状态封装在可观察对象中,以便每个订阅都会获得 count.

的新实例

试试这个:

var observable =
    Observable
        .Defer(() =>
        {
            var count = 0;
            return
                Observable
                    .Interval(TimeSpan.FromSeconds(1))
                    .Select(x =>
                    {
                        Console.Write(".");
                        return ++count;
                    });
        })
        .Window(() => Observable.Timer(TimeSpan.FromMinutes(0.1)))
        .SelectMany(xs => xs.LastAsync());

var subscription =
    observable
        .Subscribe(x => Console.WriteLine(x));

我得到这样的输出:

...........................................................59
............................................................119
............................................................179
............................................................239

记住它是从 0 开始的,那么这个时机非常好。


看到 paulpdaniels 的回答后,我意识到我可以用更简单的 Sample 运算符替换我的 Window/SelectMany/LastAsync

此外,如果我们真的不需要递增计数器的副作用,那么整个可观察量将缩小为:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Do(x => Console.Write("."))
        .Sample(TimeSpan.FromMinutes(1.0));

observable.Subscribe(x => Console.WriteLine(x));

简单多了!

我会用 Select + Sample:

var observable = Observable.Interval(TimeSpan.FromSeconds(1))
          .Select((x, i) => {
            Foo(x);
            return i;
          })
          .Do(_ => Console.Write("."))
          .Sample(TimeSpan.FromMinutes(1));

observable.Subscribe(x => Console.WriteLine(x));

Select 有一个重载,即 returns 当前值的索引,通过返回它然后以 1 分钟的间隔进行采样,您可以获得该间隔内发出的最后一个值。