使用 rx 订阅事件并在时间间隔后执行日志记录
Using rx to subscribe to event and perform logging after time interval
我有一个简单的用例:
- 接收事件通知
- 对事件执行一些操作
- 打印x间隔后的内容
如何在单个 Rx 管道中执行上述步骤?
如下所示:
void Main()
{
var observable = Observable.Interval(TimeSpan.FromSeconds(1));
// Receive event and call Foo()
observable.Subscribe(x=>Foo());
// After 1 minute, I want to print the result of count
// How do I do this using above observable?
}
int count = 0;
void Foo()
{
Console.Write(".");
count ++;
}
我认为这符合您的要求:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1))
.Do(x => Foo())
.Window(() => Observable.Timer(TimeSpan.FromMinutes(1.0)));
var subscription =
observable
.Subscribe(xs => Console.WriteLine(count));
但是,将状态与可观察对象混合是个坏主意。如果您有两个订阅,您会以两倍的速度递增 count
。最好将您的状态封装在可观察对象中,以便每个订阅都会获得 count
.
的新实例
试试这个:
var observable =
Observable
.Defer(() =>
{
var count = 0;
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(x =>
{
Console.Write(".");
return ++count;
});
})
.Window(() => Observable.Timer(TimeSpan.FromMinutes(0.1)))
.SelectMany(xs => xs.LastAsync());
var subscription =
observable
.Subscribe(x => Console.WriteLine(x));
我得到这样的输出:
...........................................................59
............................................................119
............................................................179
............................................................239
记住它是从 0 开始的,那么这个时机非常好。
看到 paulpdaniels 的回答后,我意识到我可以用更简单的 Sample
运算符替换我的 Window
/SelectMany
/LastAsync
。
此外,如果我们真的不需要递增计数器的副作用,那么整个可观察量将缩小为:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Do(x => Console.Write("."))
.Sample(TimeSpan.FromMinutes(1.0));
observable.Subscribe(x => Console.WriteLine(x));
简单多了!
我会用 Select
+ Sample
:
var observable = Observable.Interval(TimeSpan.FromSeconds(1))
.Select((x, i) => {
Foo(x);
return i;
})
.Do(_ => Console.Write("."))
.Sample(TimeSpan.FromMinutes(1));
observable.Subscribe(x => Console.WriteLine(x));
Select
有一个重载,即 returns 当前值的索引,通过返回它然后以 1 分钟的间隔进行采样,您可以获得该间隔内发出的最后一个值。
我有一个简单的用例:
- 接收事件通知
- 对事件执行一些操作
- 打印x间隔后的内容
如何在单个 Rx 管道中执行上述步骤?
如下所示:
void Main()
{
var observable = Observable.Interval(TimeSpan.FromSeconds(1));
// Receive event and call Foo()
observable.Subscribe(x=>Foo());
// After 1 minute, I want to print the result of count
// How do I do this using above observable?
}
int count = 0;
void Foo()
{
Console.Write(".");
count ++;
}
我认为这符合您的要求:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1))
.Do(x => Foo())
.Window(() => Observable.Timer(TimeSpan.FromMinutes(1.0)));
var subscription =
observable
.Subscribe(xs => Console.WriteLine(count));
但是,将状态与可观察对象混合是个坏主意。如果您有两个订阅,您会以两倍的速度递增 count
。最好将您的状态封装在可观察对象中,以便每个订阅都会获得 count
.
试试这个:
var observable =
Observable
.Defer(() =>
{
var count = 0;
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(x =>
{
Console.Write(".");
return ++count;
});
})
.Window(() => Observable.Timer(TimeSpan.FromMinutes(0.1)))
.SelectMany(xs => xs.LastAsync());
var subscription =
observable
.Subscribe(x => Console.WriteLine(x));
我得到这样的输出:
...........................................................59
............................................................119
............................................................179
............................................................239
记住它是从 0 开始的,那么这个时机非常好。
看到 paulpdaniels 的回答后,我意识到我可以用更简单的 Sample
运算符替换我的 Window
/SelectMany
/LastAsync
。
此外,如果我们真的不需要递增计数器的副作用,那么整个可观察量将缩小为:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Do(x => Console.Write("."))
.Sample(TimeSpan.FromMinutes(1.0));
observable.Subscribe(x => Console.WriteLine(x));
简单多了!
我会用 Select
+ Sample
:
var observable = Observable.Interval(TimeSpan.FromSeconds(1))
.Select((x, i) => {
Foo(x);
return i;
})
.Do(_ => Console.Write("."))
.Sample(TimeSpan.FromMinutes(1));
observable.Subscribe(x => Console.WriteLine(x));
Select
有一个重载,即 returns 当前值的索引,通过返回它然后以 1 分钟的间隔进行采样,您可以获得该间隔内发出的最后一个值。