什么类型的可观察 UniRx CombineLatest() 运算符 returns ?冷还是热?
what type of observable UniRx CombineLatest() operator returns ? cold or hot?
我正在使用 UniRx 统一使用流。我正在使用 CombineLatest() 运算符组合两个主题并在其中一个流中发布此值。当我在 .Subscribe() 之外使用生成的值时,OnNext() 被调用并完美运行:
Observable.CombineLatest(positionStream, speed, (position, speed) =>
{
return position + speed;
}).TakeLast(1).Subscribe(a =>
{
last_value = a;
});
positionStream.OnNext(last_value);
但是当我在 .Subscribe() 中调用 OnNext() 时,永远不会调用 .OnNext():
Observable.CombineLatest(positionStream, speed, (position, speed) =>
{
return position + speed;
}).TakeLast(1).Subscribe(a =>
{
positionStream.OnNext(a);
});
有人可以告诉我第二种方法有什么问题吗?我怀疑 CombineLatest() returns 冷可观察,这就是第二种方法不起作用的原因。
您正在呼叫 OnNext
,而主题在中间,处理下一个呼叫的前一个。这会导致堆栈溢出。
如果你想每个时间单位更新一个对象的位置,那么你应该设置一个Interval
。
您正在调用 .TakeLast(1)
,这要求源可观察对象做两件事。 (1) 产生一个值,(2) 完成。您的代码没有显示您的 CombineLatest
是如何完成的,因此看起来好像没有产生任何值。
这是调用订阅的代码版本:
var positionStream = new Subject<int>();
var speed = new Subject<int>();
Observable
.CombineLatest(positionStream, speed, (position, speed) => position + speed)
.TakeLast(1)
.Subscribe(a =>
{
positionStream.OnNext(a);
Console.WriteLine("!");
});
speed.OnNext(42);
positionStream.OnNext(42);
speed.OnNext(42);
positionStream.OnNext(42);
speed.OnCompleted();
positionStream.OnCompleted();
请注意,它只调用一次订阅。
现在,如果我明白你想做什么,你似乎希望能够手动重置位置,但也发送速度值,这也会更新当前位置。
如果正确,试试这个:
var positionStream = new Subject<int>();
var speed = new Subject<int>();
var final_position =
positionStream
.Select(p => speed.Scan(p, (a, x) => a + x).StartWith(p))
.Switch();
final_position
.Subscribe(a => Console.WriteLine(a));
positionStream.OnNext(42);
speed.OnNext(2);
speed.OnNext(5);
positionStream.OnNext(16);
speed.OnNext(4);
产生以下值:
42
44
49
16
20
以下是将所有值推入 positionStream
的方法:
var final_position =
positionStream
.Select(p =>
speed
.Scan(p, (a, x) => a + x)
.Do(y => positionStream.OnNext(y))
.StartWith(p))
.Switch();
我不喜欢使用 Do
来强制产生副作用,但它在这里有效。
我正在使用 UniRx 统一使用流。我正在使用 CombineLatest() 运算符组合两个主题并在其中一个流中发布此值。当我在 .Subscribe() 之外使用生成的值时,OnNext() 被调用并完美运行:
Observable.CombineLatest(positionStream, speed, (position, speed) =>
{
return position + speed;
}).TakeLast(1).Subscribe(a =>
{
last_value = a;
});
positionStream.OnNext(last_value);
但是当我在 .Subscribe() 中调用 OnNext() 时,永远不会调用 .OnNext():
Observable.CombineLatest(positionStream, speed, (position, speed) =>
{
return position + speed;
}).TakeLast(1).Subscribe(a =>
{
positionStream.OnNext(a);
});
有人可以告诉我第二种方法有什么问题吗?我怀疑 CombineLatest() returns 冷可观察,这就是第二种方法不起作用的原因。
您正在呼叫 OnNext
,而主题在中间,处理下一个呼叫的前一个。这会导致堆栈溢出。
如果你想每个时间单位更新一个对象的位置,那么你应该设置一个Interval
。
您正在调用 .TakeLast(1)
,这要求源可观察对象做两件事。 (1) 产生一个值,(2) 完成。您的代码没有显示您的 CombineLatest
是如何完成的,因此看起来好像没有产生任何值。
这是调用订阅的代码版本:
var positionStream = new Subject<int>();
var speed = new Subject<int>();
Observable
.CombineLatest(positionStream, speed, (position, speed) => position + speed)
.TakeLast(1)
.Subscribe(a =>
{
positionStream.OnNext(a);
Console.WriteLine("!");
});
speed.OnNext(42);
positionStream.OnNext(42);
speed.OnNext(42);
positionStream.OnNext(42);
speed.OnCompleted();
positionStream.OnCompleted();
请注意,它只调用一次订阅。
现在,如果我明白你想做什么,你似乎希望能够手动重置位置,但也发送速度值,这也会更新当前位置。
如果正确,试试这个:
var positionStream = new Subject<int>();
var speed = new Subject<int>();
var final_position =
positionStream
.Select(p => speed.Scan(p, (a, x) => a + x).StartWith(p))
.Switch();
final_position
.Subscribe(a => Console.WriteLine(a));
positionStream.OnNext(42);
speed.OnNext(2);
speed.OnNext(5);
positionStream.OnNext(16);
speed.OnNext(4);
产生以下值:
42
44
49
16
20
以下是将所有值推入 positionStream
的方法:
var final_position =
positionStream
.Select(p =>
speed
.Scan(p, (a, x) => a + x)
.Do(y => positionStream.OnNext(y))
.StartWith(p))
.Switch();
我不喜欢使用 Do
来强制产生副作用,但它在这里有效。