什么类型的可观察 UniRx CombineLatest() 运算符 returns ?冷还是热?

what type of observable UniRx CombineLatest() operator returns ? cold or hot?

我正在使用 UniRx 统一使用流。我正在使用 CombineLatest() 运算符组合两个主题并在其中一个流中发布此值。当我在 .Subscribe() 之外使用生成的值时,OnNext() 被调用并完美运行:

Observable.CombineLatest(positionStream, speed, (position, speed) =>
        {
            return position + speed;
        }).TakeLast(1).Subscribe(a =>
          {
              last_value = a;
          });
        positionStream.OnNext(last_value);

但是当我在 .Subscribe() 中调用 OnNext() 时,永远不会调用 .OnNext():

Observable.CombineLatest(positionStream, speed, (position, speed) =>
    {
        return position + speed;
    }).TakeLast(1).Subscribe(a =>
      {
          positionStream.OnNext(a);
      });

有人可以告诉我第二种方法有什么问题吗?我怀疑 CombineLatest() returns 冷可观察,这就是第二种方法不起作用的原因。

您正在呼叫 OnNext,而主题在中间,处理下一个呼叫的前一个。这会导致堆栈溢出。

如果你想每个时间单位更新一个对象的位置,那么你应该设置一个Interval

您正在调用 .TakeLast(1),这要求源可观察对象做两件事。 (1) 产生一个值,(2) 完成。您的代码没有显示您的 CombineLatest 是如何完成的,因此看起来好像没有产生任何值。

这是调用订阅的代码版本:

var positionStream = new Subject<int>();
var speed = new Subject<int>();

Observable
    .CombineLatest(positionStream, speed, (position, speed) => position + speed)
    .TakeLast(1)
    .Subscribe(a =>
    {
        positionStream.OnNext(a);
        Console.WriteLine("!");
    });

speed.OnNext(42);
positionStream.OnNext(42);

speed.OnNext(42);
positionStream.OnNext(42);

speed.OnCompleted();
positionStream.OnCompleted();

请注意,它只调用一次订阅。

现在,如果我明白你想做什么,你似乎希望能够手动重置位置,但也发送速度值,这也会更新当前位置。

如果正确,试试这个:

var positionStream = new Subject<int>();
var speed = new Subject<int>();

var final_position =
    positionStream
        .Select(p => speed.Scan(p, (a, x) => a + x).StartWith(p))
        .Switch();

final_position
    .Subscribe(a => Console.WriteLine(a));

positionStream.OnNext(42);
speed.OnNext(2);
speed.OnNext(5);
positionStream.OnNext(16);
speed.OnNext(4);

产生以下值:

42
44
49
16
20

以下是将所有值推入 positionStream 的方法:

var final_position =
    positionStream
        .Select(p =>
            speed
                .Scan(p, (a, x) => a + x)
                .Do(y => positionStream.OnNext(y))
                .StartWith(p))
        .Switch();

我不喜欢使用 Do 来强制产生副作用,但它在这里有效。