如果新项目到达可观察对象,我可以检查观察者吗?
Can I check in observer if a new item arrived to the observable?
我正在访问属于不同计算进程的内存区域。
该区域的变化相对较少,我需要在有变化时运行 计算一次。我收到有关更改的通知,但我需要稍等片刻以确保不再进行任何更改。我这样建模:
var readyToBeProcessed = changed
.Select(x => DateTime.Now)
.Throttle(TimeSpan.FromSeconds(5));
但是我的计算需要相当长的时间,而且在我计算的过程中内存可能会发生变化。在这种情况下,我需要将这一轮特定的计算标记为无效。
但是我如何在我的观察者中知道,当我完成计算时,在处理当前事件时是否有另一个事件到达?如果没有事件到达,因为我开始计算,然后它有效,我可以存储结果。
在实践中,事件以允许计算无效的模式(足够快)到达的情况很少见,但我仍然愿意满足这种情况。
注意:我知道我不能保证总是有有效的计算。在内存发生变化和我收到事件之间有一小段时间。完全有可能,序列是这样的 1)我正在计算 2)内存变化 3)我完成计算并检查事件,并确定计算有效 4)内存变化事件到达。我很高兴现在能忍受这个
readyToBeProcessed.Subscribe(x =>
{
Log.Info("Start work...");
// Do calculation here
...
// When finished
if (Is there a new item)
{
Log.Info("There were changes while we worked... Invalidating");
Invalidate();
}
else
{
Log.Info("Succeeded");
}
}, cancellationToken);
Reactive 不适合这项任务吗?
理想情况下,我建议您使用 Task
来跟踪您的工作,然后您可以使用:
readyToBeProcessed
.Select(evt => Observable.StartAsync<Unit>(async (cancellationToken) =>
{
//pass cancellationToken to work
var result = await DoWork(cancellationToken);
//test token if needed
return result;
}))
.Switch()
.Subscribe();
当下一个物品到达时,当前令牌将被取消。
我认为 Rx 实际上是一个不错的选择,尽管您可能需要更明确地对其进行建模。
想想实际上有五种类型的事件:项目更改、工作开始和工作结束、无效和成功(我希望我可以使用更好的名称,但我正在努力你写的)。
这是它们如何工作的弹珠图:
t(sec) : 0--1--2--3--4--5--6--7--8--9--10-11-12-13-14-15-16...
item-change : *-*--**-----------------*-------------------------...
do-Work-begins: ---------------------*-----------------*----------...
do-Work-ends : -------------------------*------------------*-----...
invalidate : -------------------------*------------------------...
succeeded : --------------------------------------------*-----...
我们会在项目更改暂停 5 秒后开始工作。如果在工作时间内发生了任何变化,我们希望在工作完成时失效。如果没有,我们要观察成功。
var doWorkBegins = changed
.Select(x => DateTime.Now)
.Throttle(TimeSpan.FromSeconds(5));
var doWorkEnds = doWorkBegins
.SelectMany(x =>
{
Log.Info("Start work...");
// DoWork();
//
// should return an observable that returns a single value when complete.
// If DoWork is just a void, then can use
// return Observable.Return(Unit.Default);
});
var lists = changed
.Buffer(() => doWorkEnds)
.Publish().RefCount();
var succeeded = lists
.Where(l => l.Count == 0);
var invalidate = lists
.Where(l => l.Count > 0);
invalidate.Subscribe(x =>
{
Log.Info("There were changes while we worked... Invalidating");
Invalidate();
}, cancellationToken);
succeeded.Subscribe(x =>
{
Log.Info("Succeeded");
}, cancellationToken);
我正在访问属于不同计算进程的内存区域。 该区域的变化相对较少,我需要在有变化时运行 计算一次。我收到有关更改的通知,但我需要稍等片刻以确保不再进行任何更改。我这样建模:
var readyToBeProcessed = changed
.Select(x => DateTime.Now)
.Throttle(TimeSpan.FromSeconds(5));
但是我的计算需要相当长的时间,而且在我计算的过程中内存可能会发生变化。在这种情况下,我需要将这一轮特定的计算标记为无效。
但是我如何在我的观察者中知道,当我完成计算时,在处理当前事件时是否有另一个事件到达?如果没有事件到达,因为我开始计算,然后它有效,我可以存储结果。
在实践中,事件以允许计算无效的模式(足够快)到达的情况很少见,但我仍然愿意满足这种情况。
注意:我知道我不能保证总是有有效的计算。在内存发生变化和我收到事件之间有一小段时间。完全有可能,序列是这样的 1)我正在计算 2)内存变化 3)我完成计算并检查事件,并确定计算有效 4)内存变化事件到达。我很高兴现在能忍受这个
readyToBeProcessed.Subscribe(x =>
{
Log.Info("Start work...");
// Do calculation here
...
// When finished
if (Is there a new item)
{
Log.Info("There were changes while we worked... Invalidating");
Invalidate();
}
else
{
Log.Info("Succeeded");
}
}, cancellationToken);
Reactive 不适合这项任务吗?
理想情况下,我建议您使用 Task
来跟踪您的工作,然后您可以使用:
readyToBeProcessed
.Select(evt => Observable.StartAsync<Unit>(async (cancellationToken) =>
{
//pass cancellationToken to work
var result = await DoWork(cancellationToken);
//test token if needed
return result;
}))
.Switch()
.Subscribe();
当下一个物品到达时,当前令牌将被取消。
我认为 Rx 实际上是一个不错的选择,尽管您可能需要更明确地对其进行建模。
想想实际上有五种类型的事件:项目更改、工作开始和工作结束、无效和成功(我希望我可以使用更好的名称,但我正在努力你写的)。
这是它们如何工作的弹珠图:
t(sec) : 0--1--2--3--4--5--6--7--8--9--10-11-12-13-14-15-16...
item-change : *-*--**-----------------*-------------------------...
do-Work-begins: ---------------------*-----------------*----------...
do-Work-ends : -------------------------*------------------*-----...
invalidate : -------------------------*------------------------...
succeeded : --------------------------------------------*-----...
我们会在项目更改暂停 5 秒后开始工作。如果在工作时间内发生了任何变化,我们希望在工作完成时失效。如果没有,我们要观察成功。
var doWorkBegins = changed
.Select(x => DateTime.Now)
.Throttle(TimeSpan.FromSeconds(5));
var doWorkEnds = doWorkBegins
.SelectMany(x =>
{
Log.Info("Start work...");
// DoWork();
//
// should return an observable that returns a single value when complete.
// If DoWork is just a void, then can use
// return Observable.Return(Unit.Default);
});
var lists = changed
.Buffer(() => doWorkEnds)
.Publish().RefCount();
var succeeded = lists
.Where(l => l.Count == 0);
var invalidate = lists
.Where(l => l.Count > 0);
invalidate.Subscribe(x =>
{
Log.Info("There were changes while we worked... Invalidating");
Invalidate();
}, cancellationToken);
succeeded.Subscribe(x =>
{
Log.Info("Succeeded");
}, cancellationToken);