响应式扩展:拆分输入、处理和连接回去

Reactive Extensions: Split input, process, and concatenate back

基本上,我有一个可观察的输入字符串,我想单独处理它,然后对结果做一些事情。如果输入字符串包含逗号(作为分隔符),我想拆分字符串并单独处理每个子字符串,然后对每个字符串序列进行处理。下面的代码片段说明了我正在尝试做的事情的简化版本:

[Fact]
public void UniTest1()
{
    var observable = new ReplaySubject<string>();
    observable.OnNext("a,b");
    observable.OnNext("c,d,e");
    observable.OnCompleted();

    var result = new List<string[]>();
    observable
        .SelectMany(x => x.Split(','))
        .Select(x => x.ToUpper())
        .ToArray() // How to collect an IEnumerable for each item here?
        .Do(s => result.Add(s))
        .Subscribe();

    // Here, result is actually {{"A","B","C","D","E"}}, I need {{"A","B"},{"C","D","E"}}
    Assert.Equal(2, result.Count);

    Assert.Equal("A", result[0][0]);
    Assert.Equal("B", result[0][1]);

    Assert.Equal("C", result[1][0]);
    Assert.Equal("D", result[1][1]);
    Assert.Equal("E", result[1][2]);
}

正如评论中所解释的,上面的方法不起作用。 .ToArray() 调用将整个 observable 连接成一个序列。

但是,我通过将拆分和处理合并为一个动作解决了这个问题,例如:

[Fact]
public void UniTest2()
{
    var observable = new ReplaySubject<string>();
    observable.OnNext("a,b");
    observable.OnNext("c,d,e");
    observable.OnCompleted();

    var result = new List<string[]>();
    observable
        .Select(x => x.Split(',').Select(s => s.ToUpper()).ToArray())
        .Do(s => result.Add(s))
        .Subscribe();

    // Result is as expected: {{"A","B"},{"C","D","E"}}
    Assert.Equal(2, result.Count);
    Assert.Equal("A", result[0][0]);
    Assert.Equal("B", result[0][1]);
    Assert.Equal("C", result[1][0]);
    Assert.Equal("D", result[1][1]);
    Assert.Equal("E", result[1][2]);
}

但是有没有一种方法,使用 Rx,通过不把拆分和处理放在同一个动作中来解决这个问题?这个问题的推荐解决方案是什么?

我还应该提到处理,即 ToUpper() 调用,实际上是 Web 服务调用。我在我的示例中使用了 ToUpper() 以便我的问题应该很容易解释。但这意味着我希望此处理并行且非阻塞地完成。

如果我没理解错的话,你想保留原来的数组。但是,在 SelectMany 之后,您已直接在流中将数组展平为单个值,因此您无法再将它们改回单独的数组。诀窍是将 ToUpperToArray 移动到 SelectMany.

另外 ToUpper 不是异步函数。它很重要,否则你不会得到任何并行性(我假设它在你的真实代码中,但它使 ToUpper 成为一个糟糕的替代品。)。相反,我将使用 Observable.Timer。如果您的 Web 服务调用还不是可观察的,您需要转换它,但这是一个不同的问题,有点超出这里的范围。

这确实意味着您的结果可能有问题。

new string[] { "a,b", "c,d,e" }.ToObservable()
    .SelectMany(str => str.Split(',')
        .ToObservable()
        .SelectMany(x => Observable.Timer(DateTime.Now.AddSeconds(2))
            .Select(_ => x.ToUpper()))
        .ToArray())
    .Subscribe(arr => { Console.WriteLine(string.Join(",", arr)); });

我在您的代码中注意到的其他一些事情:

    .Do(s => result.Add(s))
    .Subscribe();

你可以把result.Add(s)直接放在Subscribe

    .Subscribe(s => result.Add(s));

事实上,如果您正在编写测试用例,请使用 testSchedulerresults.Messages.AssertEqual

using Microsoft.Reactive.Testing;
using NUnit.Framework;
using System;
using System.Reactive.Linq;

namespace test
{
    [TestFixture]
    public class UnitTests : ReactiveTest
    {

        [Test]
        public void UniTest1()
        {
            var testScheduler = new TestScheduler();

            var source = new string[] { "a,b", "c,d,e" }.ToObservable();

            var results = testScheduler.Start(
                () => source.SelectMany(str => str.Split(',')
                    .ToObservable()
                    .Select(x => x.ToUpper())
                    .ToArray()));

            results.Messages.AssertEqual(
                OnNext<string[]>(Subscribed, new string[] { "A", "B" }),
                OnNext<string[]>(Subscribed, new string[] { "C", "D", "E" }),
                OnCompleted<string[]>(Subscribed)
                );
        }
    }
}

测试 Rx 的有用资源:
http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html#TestingRx
http://blogs.msdn.com/b/rxteam/archive/2012/06/14/testing-rx-queries-using-virtual-time-scheduling.aspx
https://msdn.microsoft.com/en-us/library/hh242967%28v=vs.103%29.aspx

您最终在代码中提出了很多值得一提的事情。

首先,.ToArray() 运算符采用 return 零个或多个单个值的可观察对象,并将其更改为 return 零个或多个单个数组的可观察对象值。这样的可观察对象必须先完成,然后才能 return 它的唯一值。

考虑到这一点,第一个查询的结果应该是有意义的。

您使用 x.Split(',').Select(s => s.ToUpper()).ToArray() 的第二个查询产生了您想要的输出,但您想知道 "is there a way, using RX, to solve this problem by NOT putting the splitting and processing in the same action"。

嗯,琐碎的,是的:

var result = new List<string[]>();
observable
    .Select(x => x.Split(','))
    .Select(x => x.Select(s => s.ToUpper()))
    .Select(x => x.ToArray())
    .Do(s => result.Add(s))
    .Subscribe();

但是,这不会并行处理项目。 Rx 设计为串行工作,除非您调用引入并行性的操作。

通常,一种简单的方法是使用长运行宁select,例如.Select(x => longRunningOperation(x)),然后用它来做:

.SelectMany(x => Observable.Start(() => longRunningOperation(x)))

对于您的情况,您可以先这样做:

observable
    .ObserveOn(Scheduler.Default)
    .SelectMany(x => Observable.Start(() => x.Split(',')))
    .SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper())))
    .SelectMany(x => Observable.Start(() => x.ToArray()))
    .Do(s => result.Add(s))
    .Subscribe();

但这只是并行化每个原始 .OnNext 调用,而不是其中的处理。为此,您需要将 x.Split(',') 的结果转换为可观察对象,并并行处理。

    observable
        .SelectMany(x => Observable.Start(() => x.Split(',').ToObservable()))
        .SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper()))))
        .SelectMany(x => Observable.Start(() => x.ToArray()))
        .Do(s => s.Do(t => result.Add(t)))
        .Merge()
        .Subscribe();

但这开始看起来很疯狂,它不再 运行 在当前线程上,这意味着您的测试不会等待结果。

让我们重新看一下这个查询。

我已经开始摆脱 .Do 调用。这些通常有利于调试,但对于任何状态更改来说都是不好的。它们可以 运行 在查询中的任何线程上的任何时候,因此您需要确保 .Do 调用中的代码是线程安全的并且调用 result.Add(s)不是线程安全的。

我还引入了一个 "webservice" 调用来用一秒的处理延迟替换 .ToUpper() 以便我们可以看到查询需要多长时间来处理,从而知道它是否 运行并联与否。如果最终查询需要 5 秒到 运行,那么就没有并行性,如果它更少,那么我们就赢了。

所以,如果我以最基本的方式编写查询,它看起来像这样:

Func<string, string> webservice = x =>
{
    Thread.Sleep(1000);
    return x.ToUpper();
};

var query =
    observable
        .Select(ls =>
            from p in ls.Split(',')
            select webservice(p))
        .Select(rs => rs.ToArray())
        .ToArray()
        .Select(rss => new List<string[]>(rss));

var sw = Stopwatch.StartNew();
List<string[]> result = query.Wait();
sw.Stop();

当我 运行 这样做时,我得到了预期的结果 {{"A","B"},{"C","D","E"}},但完成只需要 5 秒多一点。此处没有预期的并行性。

现在让我们介绍一些并行性:

var query =
    observable
        .Select(ls =>
            from p in ls.Split(',').ToObservable()
            from r in Observable.Start(() => webservice(p))
            select r)
        .Select(rs => rs.ToArray())
        .Merge()
        .ToArray()
        .Select(rss => new List<string[]>(rss));

我基本上已经将上面描述的“Select 应用于 SelectMany/Start” 模式。唯一棘手的部分是 .Select(rs => rs.ToArray())IObservable<string[]> 变成了 IObservable<IObservable<string[]>> 所以我弹出 .Merge() 把它弄平了。当您将并行性引入 Rx 查询时,这是正常的。

现在,当我 运行 查询时 - BOOM - 刚好超过一秒钟。所有五个输入都 运行ning 并联。现在唯一的问题是顺序不再是决定性的。但是当结果并行执行时,你无能为力。

一个这样的 运行 我得到了这个结果:

如果我运行将此作为测试,我会将结果按已知顺序排序并将其与预期结果进行比较。