在 RX.net 中连接可观察对象

Connecting observables in RX.net

我正在尝试使用 RX.net 制作一个小管道,但我无法弄清楚如何在不将每个进程的代码嵌套在管道中的情况下进行操作,这使得代码非常混乱。

这基本上就是我想做的

inputString -> toUpperCase -> reverseString -> printOutput

这是我开始工作的代码,但如果我想向管道中添加更多东西,它就不会很漂亮。

        var inputObservable = Observable.Return("hello world");

        inputObservable.Subscribe(inputString =>
        {
            var toUpperCaseObservable = Observable.Return(inputString.ToUpper());
            toUpperCaseObservable.Subscribe(toUpperCaseInput =>
            {
                var reverseStringObservable = Observable.Return(new String(toUpperCaseInput.Reverse().ToArray()));
                reverseStringObservable.Subscribe(reverseStringInput =>
                {
                    var writeOutputObservable = Observable.Return(reverseStringInput);
                    writeOutputObservable.Subscribe(input =>
                    {
                        Console.WriteLine(input);
                        Console.ReadLine();
                    });      
                });                    
            });

        });

您真的不想以这种方式创建 Rx "pipeline"。像这样的嵌套订阅并不是真正的 Rx 管道。

以下是您在基本 Rx query/subscribe 方法中的操作方式:

    var query =
        from x in Observable.Return("hello world")
        let y = x.ToUpper()
        select new String(y.Reverse().ToArray());

    query.Subscribe(z =>
    {
        Console.WriteLine(z);
        Console.ReadLine();
    });

简单得多,但出于说明目的,您的操作可能很昂贵,因此这可能是更好的方法:

    var query =
        from x in Observable.Return("hello world")
        from y in Observable.Start(() => x.ToUpper())
        from z in Observable.Start(() => new String(y.Reverse().ToArray()))
        select z;

    query.Subscribe(z =>
    {
        Console.WriteLine(z);
        Console.ReadLine();
    });

一般规则是,在您准备好对管道结果执行 操作 之前,您不应在管道结束之前订阅。如果您需要执行 计算 那么它应该保留在查询管道中。

这也行

var inputObservable = Observable.Return("hello world")
    .Select(s=>s.ToUpper())
    .Select(upperCased=>new String(upperCased.Reverse().ToArray());

inputObservable.Subscribe(str =>
{
    Console.WriteLine(str);
    Console.ReadLine();
});