如何关联函数输入和输出?

How to correlate function inputs and outputs?

考虑下面的简单程序。它有一个可观察的整数和一个函数来计算最近发布的整数是偶数还是奇数。出乎意料的是,程序在报告号码已更改之前报告最近的号码是否 even/odd。

static void Main(string[] args) {
    int version = 0;
    var numbers = new Subject<int>();
    IObservable<bool> isNumberEven = numbers.Select(i => i % 2 == 0);
    isNumberEven
        .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
        .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));
    numbers
        .Select(i => new { Number = i, Version = Interlocked.Increment(ref version) })
        .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.Number}"));
    numbers.OnNext(1);
    numbers.OnNext(2);
    numbers.OnNext(3);
    Console.ReadLine();
}

输出为:

Time 1 : False
Time 2 : 1
Time 3 : True
Time 4 : 2
Time 5 : False
Time 6 : 3

我认为改变数字会引发一连串的下游影响,并且这些会按照它们发生的顺序进行报告。交换订阅顺序将交换报告结果的方式。我知道 rx 是异步的,事情有可能以不确定的顺序发生。如果我在我的函数中使用 .Delay() 或网络调用,我无法确定何时会报告结果。但是在这种情况下,我很惊讶。

为什么这很重要?我认为这意味着如果我想尝试关联函数的输入和输出(比如打印发布的数字以及它们是偶数还是奇数),我必须在输出结果中包含输入参数,如下所示:

var isNumberEven = numbers.Select(i => new {
    Number = i,
    IsEven = i % 2 == 0
});

我想我可以构建一堆简单的小函数,然后使用 rx 运算符组合它们来完成复杂的计算。但也许我不能使用 rx 运算符来获得 combine/join/correlate 结果。当我定义每个函数时,我必须自己关联输入和输出。

在某些情况下,我可以使用 rx 运算符关联结果。如果每个输入都产生一个输出,我可以压缩这两个。但是一旦你做了类似 Throttle input 的操作,它就不再起作用了。

这个版本的程序似乎确实以合理的方式报告数字是偶数还是奇数。

static void Main(string[] args) {
    var numbers = new Subject<int>();
    var isNumberEven = numbers.Select(i => i % 2 == 0);
    var publishedNumbers = numbers.Publish().RefCount();
    var report =
        publishedNumbers
        .GroupJoin(
            isNumberEven,
            (_) => publishedNumbers,
            (_) => Observable.Empty<bool>(),
            (n, e) => new { Number = n, IsEven = e })
        .SelectMany(i => i.IsEven.Select(j => new { Number = i.Number, IsEven = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Number} {(i.IsEven ? "even" : "odd")}"));
    numbers.OnNext(1);
    numbers.OnNext(2);
    numbers.OnNext(3);
    Console.ReadLine();
}

输出如下:

1 odd
2 even
3 odd

但我不知道这是巧合还是靠得住。 Rx 中的哪些操作以确定的顺序发生?哪些是不可预测的?我是否应该定义所有函数以在结果中包含输入参数?

您的第一个程序的行为完全符合我的预期,而且是确定性的。

I understand that rx is asynchronous and it is possible for things to happen in non-deterministic order.

如果您引入非确定性行为(如 concurrency/Scheduling),事情只会以非确定性顺序发生,否则 Rx 是确定性的。

这里有几个 issues/misconceptions。 1) 可变外部状态 - version 2) 主题的使用(但在本示例中根本不是问题) 3) 对回调如何发出的误解。

我们只关注 3)。如果我们带您编写代码并将其解包到其基本调用站点,您可能会发现 Rx 在幕后是多么简单。

numbers.OnNext(1); 主题将查找它的订阅,并且 OnNext 每个订阅都按照他们订阅的顺序查找。

IObservable<bool> isNumberEven = numbers.Select(i => i % 2 == 0);
isNumberEven
    .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
    .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));

也可以减为

numbers.Select(i => i % 2 == 0)
    .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
    .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));

一个 可以 争辩说,由于 isNumberEven 从未在其他任何地方使用过,因此您 应该 将其缩减为这个。

所以我们可以看到我们有了第一个订阅者。 实际上 运行 的代码是这个

private void HandleOnNext(int i)
{
    var isEven = i % 2 == 0
    var temp = new { IsEven = isEven , Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp .Version} : {temp .IsEven}");
}

我们的第二个订阅者(因为偶数订阅后调用了.Subscribe(方法),是numbers订阅者。 他的代码可以有效地归结为

private void HandleOnNext(int i)
{
    var temp = new { Number = i, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.Number}");
}

所以一旦你完全解构了代码,你最终基本上会得到这个

void Main()
{
    int version = 0;

    //numbers.OnNext(1);
    ProcessEven(1, ref version);
    ProcessNumber(1, ref version);
    //numbers.OnNext(2);
    ProcessEven(2, ref version);
    ProcessNumber(2, ref version);
    //numbers.OnNext(3);
    ProcessEven(3, ref version);
    ProcessNumber(3, ref version);
}

// Define other methods and classes here
private void ProcessEven(int i, ref int version)
{
    var isEven = i % 2 == 0;
    var temp = new { IsEven = isEven, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.IsEven}");
}
private void ProcessNumber(int i, ref int version)
{
    var temp = new { Number = i, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.Number}");
}

一旦所有的回调和订阅都被具体化,那么你可以看到这并不是什么神奇的事情,一切都是确定性的。

Should I be defining all my functions to include the input parameters in the results?

要回答你的问题(鉴于你对 Rx 的误解,我犹豫是否这样做),你只需要在结果序列的顺序不确定时这样做。 这方面的一个例子可能是您一次发出多个网络请求。 您不能确定他们都会按照您发送给他们的顺序进行回复。 但是,您可以强制这些场景与 Concat

等运算符的使用保持一致