我应该如何解析和延迟热响应扩展 IConnectableObservable<T> 流?
How should I Parse and Delay a Hot Reactive Extensions IConnectableObservable<T> stream?
我无法实现这个,所以很遗憾我没有 Rx 代码来显示我所在的位置。我会解释我想做什么,希望有足够专业知识的人能够指导我朝着正确的方向前进。
假设我有一个从数据库中提取的 List TimeObjects 集合。 TimeObject 具有这种形式。
public class TimeObject
{
public DateTime time{get;set;}
public decimal price{get;set;}
}
我想要做的是生成一个 Hot IConnectableObservable 流,其中每个元素由可变时间延迟分隔。
可变时间延迟等于当前和下一个TimeObject的时间差。
例如,如果流是这样的:
s1, s2, s3, ...sN,其中每个 's' 代表 TimeObject
的一个元素
那么s2被推送到流之前的延时为(s2.time - s1.time),
s3 被推送到流之前的时间延迟是 (s3.time - s2.time) 等等。
这是否可能,如果可能,我的实现应该是什么样的?
我已经包含了一个 for 循环,它使用经典方法完成相同的任务。
FireEvent(TimeObject[0]); //Post the first element as an event
for(int _counter = 0; _counter < TimeObject.Count; _counter++)
{
if ( _counter + 1 < TimeObject.Count )
{
TimeObject timeObjectA = TimeObject[_counter];
TimeObject timeObjectB = TimeObject[_counter + 1];
int millisecondsDelay = (timeObjectB.time - timeObjectA.time).Milliseconds;
Thread.Sleep(millisecondsDelay);
FireEvent(timeObjectB); //this posts the latest tick as an event with variable time delay.
}
我想要的是将其转换为 Observable 流。
谢谢。
试试这个:
var observable =
Observable
.Generate(
1,
x => x < timeObjects.Count,
x => x + 1, x => timeObjects[x],
x => timeObjects[x].time.Subtract(timeObjects[x - 1].time)))
.StartWith(timeObjects[0]);
现在,这是一个普通的冷 observable。您可以通过调用 .Publish()
使其变热。你为什么需要它?
它还要求您的列表至少有两个元素
我无法实现这个,所以很遗憾我没有 Rx 代码来显示我所在的位置。我会解释我想做什么,希望有足够专业知识的人能够指导我朝着正确的方向前进。
假设我有一个从数据库中提取的 List TimeObjects 集合。 TimeObject 具有这种形式。
public class TimeObject
{
public DateTime time{get;set;}
public decimal price{get;set;}
}
我想要做的是生成一个 Hot IConnectableObservable 流,其中每个元素由可变时间延迟分隔。
可变时间延迟等于当前和下一个TimeObject的时间差。
例如,如果流是这样的:
s1, s2, s3, ...sN,其中每个 's' 代表 TimeObject
的一个元素那么s2被推送到流之前的延时为(s2.time - s1.time), s3 被推送到流之前的时间延迟是 (s3.time - s2.time) 等等。
这是否可能,如果可能,我的实现应该是什么样的?
我已经包含了一个 for 循环,它使用经典方法完成相同的任务。
FireEvent(TimeObject[0]); //Post the first element as an event
for(int _counter = 0; _counter < TimeObject.Count; _counter++)
{
if ( _counter + 1 < TimeObject.Count )
{
TimeObject timeObjectA = TimeObject[_counter];
TimeObject timeObjectB = TimeObject[_counter + 1];
int millisecondsDelay = (timeObjectB.time - timeObjectA.time).Milliseconds;
Thread.Sleep(millisecondsDelay);
FireEvent(timeObjectB); //this posts the latest tick as an event with variable time delay.
}
我想要的是将其转换为 Observable 流。
谢谢。
试试这个:
var observable =
Observable
.Generate(
1,
x => x < timeObjects.Count,
x => x + 1, x => timeObjects[x],
x => timeObjects[x].time.Subtract(timeObjects[x - 1].time)))
.StartWith(timeObjects[0]);
现在,这是一个普通的冷 observable。您可以通过调用 .Publish()
使其变热。你为什么需要它?
它还要求您的列表至少有两个元素