响应式扩展:拆分输入、处理和连接回去
Reactive Extensions: Split input, process, and concatenate back
基本上,我有一个可观察的输入字符串,我想单独处理它,然后对结果做一些事情。如果输入字符串包含逗号(作为分隔符),我想拆分字符串并单独处理每个子字符串,然后对每个字符串序列进行处理。下面的代码片段说明了我正在尝试做的事情的简化版本:
[Fact]
public void UniTest1()
{
var observable = new ReplaySubject<string>();
observable.OnNext("a,b");
observable.OnNext("c,d,e");
observable.OnCompleted();
var result = new List<string[]>();
observable
.SelectMany(x => x.Split(','))
.Select(x => x.ToUpper())
.ToArray() // How to collect an IEnumerable for each item here?
.Do(s => result.Add(s))
.Subscribe();
// Here, result is actually {{"A","B","C","D","E"}}, I need {{"A","B"},{"C","D","E"}}
Assert.Equal(2, result.Count);
Assert.Equal("A", result[0][0]);
Assert.Equal("B", result[0][1]);
Assert.Equal("C", result[1][0]);
Assert.Equal("D", result[1][1]);
Assert.Equal("E", result[1][2]);
}
正如评论中所解释的,上面的方法不起作用。 .ToArray() 调用将整个 observable 连接成一个序列。
但是,我通过将拆分和处理合并为一个动作解决了这个问题,例如:
[Fact]
public void UniTest2()
{
var observable = new ReplaySubject<string>();
observable.OnNext("a,b");
observable.OnNext("c,d,e");
observable.OnCompleted();
var result = new List<string[]>();
observable
.Select(x => x.Split(',').Select(s => s.ToUpper()).ToArray())
.Do(s => result.Add(s))
.Subscribe();
// Result is as expected: {{"A","B"},{"C","D","E"}}
Assert.Equal(2, result.Count);
Assert.Equal("A", result[0][0]);
Assert.Equal("B", result[0][1]);
Assert.Equal("C", result[1][0]);
Assert.Equal("D", result[1][1]);
Assert.Equal("E", result[1][2]);
}
但是有没有一种方法,使用 Rx,通过不把拆分和处理放在同一个动作中来解决这个问题?这个问题的推荐解决方案是什么?
我还应该提到处理,即 ToUpper() 调用,实际上是 Web 服务调用。我在我的示例中使用了 ToUpper() 以便我的问题应该很容易解释。但这意味着我希望此处理并行且非阻塞地完成。
如果我没理解错的话,你想保留原来的数组。但是,在 SelectMany
之后,您已直接在流中将数组展平为单个值,因此您无法再将它们改回单独的数组。诀窍是将 ToUpper
和 ToArray
移动到 SelectMany
.
内
另外 ToUpper
不是异步函数。它很重要,否则你不会得到任何并行性(我假设它在你的真实代码中,但它使 ToUpper
成为一个糟糕的替代品。)。相反,我将使用 Observable.Timer
。如果您的 Web 服务调用还不是可观察的,您需要转换它,但这是一个不同的问题,有点超出这里的范围。
这确实意味着您的结果可能有问题。
new string[] { "a,b", "c,d,e" }.ToObservable()
.SelectMany(str => str.Split(',')
.ToObservable()
.SelectMany(x => Observable.Timer(DateTime.Now.AddSeconds(2))
.Select(_ => x.ToUpper()))
.ToArray())
.Subscribe(arr => { Console.WriteLine(string.Join(",", arr)); });
我在您的代码中注意到的其他一些事情:
.Do(s => result.Add(s))
.Subscribe();
你可以把result.Add(s)
直接放在Subscribe
.Subscribe(s => result.Add(s));
事实上,如果您正在编写测试用例,请使用 testScheduler
和 results.Messages.AssertEqual
。
using Microsoft.Reactive.Testing;
using NUnit.Framework;
using System;
using System.Reactive.Linq;
namespace test
{
[TestFixture]
public class UnitTests : ReactiveTest
{
[Test]
public void UniTest1()
{
var testScheduler = new TestScheduler();
var source = new string[] { "a,b", "c,d,e" }.ToObservable();
var results = testScheduler.Start(
() => source.SelectMany(str => str.Split(',')
.ToObservable()
.Select(x => x.ToUpper())
.ToArray()));
results.Messages.AssertEqual(
OnNext<string[]>(Subscribed, new string[] { "A", "B" }),
OnNext<string[]>(Subscribed, new string[] { "C", "D", "E" }),
OnCompleted<string[]>(Subscribed)
);
}
}
}
测试 Rx 的有用资源:
http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html#TestingRx
http://blogs.msdn.com/b/rxteam/archive/2012/06/14/testing-rx-queries-using-virtual-time-scheduling.aspx
https://msdn.microsoft.com/en-us/library/hh242967%28v=vs.103%29.aspx
您最终在代码中提出了很多值得一提的事情。
首先,.ToArray()
运算符采用 return 零个或多个单个值的可观察对象,并将其更改为 return 零个或多个单个数组的可观察对象值。这样的可观察对象必须先完成,然后才能 return 它的唯一值。
考虑到这一点,第一个查询的结果应该是有意义的。
您使用 x.Split(',').Select(s => s.ToUpper()).ToArray()
的第二个查询产生了您想要的输出,但您想知道 "is there a way, using RX, to solve this problem by NOT putting the splitting and processing in the same action"。
嗯,琐碎的,是的:
var result = new List<string[]>();
observable
.Select(x => x.Split(','))
.Select(x => x.Select(s => s.ToUpper()))
.Select(x => x.ToArray())
.Do(s => result.Add(s))
.Subscribe();
但是,这不会并行处理项目。 Rx 设计为串行工作,除非您调用引入并行性的操作。
通常,一种简单的方法是使用长运行宁select,例如.Select(x => longRunningOperation(x))
,然后用它来做:
.SelectMany(x => Observable.Start(() => longRunningOperation(x)))
对于您的情况,您可以先这样做:
observable
.ObserveOn(Scheduler.Default)
.SelectMany(x => Observable.Start(() => x.Split(',')))
.SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper())))
.SelectMany(x => Observable.Start(() => x.ToArray()))
.Do(s => result.Add(s))
.Subscribe();
但这只是并行化每个原始 .OnNext
调用,而不是其中的处理。为此,您需要将 x.Split(',')
的结果转换为可观察对象,并并行处理。
observable
.SelectMany(x => Observable.Start(() => x.Split(',').ToObservable()))
.SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper()))))
.SelectMany(x => Observable.Start(() => x.ToArray()))
.Do(s => s.Do(t => result.Add(t)))
.Merge()
.Subscribe();
但这开始看起来很疯狂,它不再 运行 在当前线程上,这意味着您的测试不会等待结果。
让我们重新看一下这个查询。
我已经开始摆脱 .Do
调用。这些通常有利于调试,但对于任何状态更改来说都是不好的。它们可以 运行 在查询中的任何线程上的任何时候,因此您需要确保 .Do
调用中的代码是线程安全的并且调用 result.Add(s)
是 不是线程安全的。
我还引入了一个 "webservice" 调用来用一秒的处理延迟替换 .ToUpper()
以便我们可以看到查询需要多长时间来处理,从而知道它是否 运行并联与否。如果最终查询需要 5 秒到 运行,那么就没有并行性,如果它更少,那么我们就赢了。
所以,如果我以最基本的方式编写查询,它看起来像这样:
Func<string, string> webservice = x =>
{
Thread.Sleep(1000);
return x.ToUpper();
};
var query =
observable
.Select(ls =>
from p in ls.Split(',')
select webservice(p))
.Select(rs => rs.ToArray())
.ToArray()
.Select(rss => new List<string[]>(rss));
var sw = Stopwatch.StartNew();
List<string[]> result = query.Wait();
sw.Stop();
当我 运行 这样做时,我得到了预期的结果 {{"A","B"},{"C","D","E"}}
,但完成只需要 5 秒多一点。此处没有预期的并行性。
现在让我们介绍一些并行性:
var query =
observable
.Select(ls =>
from p in ls.Split(',').ToObservable()
from r in Observable.Start(() => webservice(p))
select r)
.Select(rs => rs.ToArray())
.Merge()
.ToArray()
.Select(rss => new List<string[]>(rss));
我基本上已经将上面描述的“Select
应用于 SelectMany
/Start
” 模式。唯一棘手的部分是 .Select(rs => rs.ToArray())
从 IObservable<string[]>
变成了 IObservable<IObservable<string[]>>
所以我弹出 .Merge()
把它弄平了。当您将并行性引入 Rx 查询时,这是正常的。
现在,当我 运行 查询时 - BOOM - 刚好超过一秒钟。所有五个输入都 运行ning 并联。现在唯一的问题是顺序不再是决定性的。但是当结果并行执行时,你无能为力。
一个这样的 运行 我得到了这个结果:
如果我运行将此作为测试,我会将结果按已知顺序排序并将其与预期结果进行比较。
基本上,我有一个可观察的输入字符串,我想单独处理它,然后对结果做一些事情。如果输入字符串包含逗号(作为分隔符),我想拆分字符串并单独处理每个子字符串,然后对每个字符串序列进行处理。下面的代码片段说明了我正在尝试做的事情的简化版本:
[Fact]
public void UniTest1()
{
var observable = new ReplaySubject<string>();
observable.OnNext("a,b");
observable.OnNext("c,d,e");
observable.OnCompleted();
var result = new List<string[]>();
observable
.SelectMany(x => x.Split(','))
.Select(x => x.ToUpper())
.ToArray() // How to collect an IEnumerable for each item here?
.Do(s => result.Add(s))
.Subscribe();
// Here, result is actually {{"A","B","C","D","E"}}, I need {{"A","B"},{"C","D","E"}}
Assert.Equal(2, result.Count);
Assert.Equal("A", result[0][0]);
Assert.Equal("B", result[0][1]);
Assert.Equal("C", result[1][0]);
Assert.Equal("D", result[1][1]);
Assert.Equal("E", result[1][2]);
}
正如评论中所解释的,上面的方法不起作用。 .ToArray() 调用将整个 observable 连接成一个序列。
但是,我通过将拆分和处理合并为一个动作解决了这个问题,例如:
[Fact]
public void UniTest2()
{
var observable = new ReplaySubject<string>();
observable.OnNext("a,b");
observable.OnNext("c,d,e");
observable.OnCompleted();
var result = new List<string[]>();
observable
.Select(x => x.Split(',').Select(s => s.ToUpper()).ToArray())
.Do(s => result.Add(s))
.Subscribe();
// Result is as expected: {{"A","B"},{"C","D","E"}}
Assert.Equal(2, result.Count);
Assert.Equal("A", result[0][0]);
Assert.Equal("B", result[0][1]);
Assert.Equal("C", result[1][0]);
Assert.Equal("D", result[1][1]);
Assert.Equal("E", result[1][2]);
}
但是有没有一种方法,使用 Rx,通过不把拆分和处理放在同一个动作中来解决这个问题?这个问题的推荐解决方案是什么?
我还应该提到处理,即 ToUpper() 调用,实际上是 Web 服务调用。我在我的示例中使用了 ToUpper() 以便我的问题应该很容易解释。但这意味着我希望此处理并行且非阻塞地完成。
如果我没理解错的话,你想保留原来的数组。但是,在 SelectMany
之后,您已直接在流中将数组展平为单个值,因此您无法再将它们改回单独的数组。诀窍是将 ToUpper
和 ToArray
移动到 SelectMany
.
另外 ToUpper
不是异步函数。它很重要,否则你不会得到任何并行性(我假设它在你的真实代码中,但它使 ToUpper
成为一个糟糕的替代品。)。相反,我将使用 Observable.Timer
。如果您的 Web 服务调用还不是可观察的,您需要转换它,但这是一个不同的问题,有点超出这里的范围。
这确实意味着您的结果可能有问题。
new string[] { "a,b", "c,d,e" }.ToObservable()
.SelectMany(str => str.Split(',')
.ToObservable()
.SelectMany(x => Observable.Timer(DateTime.Now.AddSeconds(2))
.Select(_ => x.ToUpper()))
.ToArray())
.Subscribe(arr => { Console.WriteLine(string.Join(",", arr)); });
我在您的代码中注意到的其他一些事情:
.Do(s => result.Add(s))
.Subscribe();
你可以把result.Add(s)
直接放在Subscribe
.Subscribe(s => result.Add(s));
事实上,如果您正在编写测试用例,请使用 testScheduler
和 results.Messages.AssertEqual
。
using Microsoft.Reactive.Testing;
using NUnit.Framework;
using System;
using System.Reactive.Linq;
namespace test
{
[TestFixture]
public class UnitTests : ReactiveTest
{
[Test]
public void UniTest1()
{
var testScheduler = new TestScheduler();
var source = new string[] { "a,b", "c,d,e" }.ToObservable();
var results = testScheduler.Start(
() => source.SelectMany(str => str.Split(',')
.ToObservable()
.Select(x => x.ToUpper())
.ToArray()));
results.Messages.AssertEqual(
OnNext<string[]>(Subscribed, new string[] { "A", "B" }),
OnNext<string[]>(Subscribed, new string[] { "C", "D", "E" }),
OnCompleted<string[]>(Subscribed)
);
}
}
}
测试 Rx 的有用资源:
http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html#TestingRx
http://blogs.msdn.com/b/rxteam/archive/2012/06/14/testing-rx-queries-using-virtual-time-scheduling.aspx
https://msdn.microsoft.com/en-us/library/hh242967%28v=vs.103%29.aspx
您最终在代码中提出了很多值得一提的事情。
首先,.ToArray()
运算符采用 return 零个或多个单个值的可观察对象,并将其更改为 return 零个或多个单个数组的可观察对象值。这样的可观察对象必须先完成,然后才能 return 它的唯一值。
考虑到这一点,第一个查询的结果应该是有意义的。
您使用 x.Split(',').Select(s => s.ToUpper()).ToArray()
的第二个查询产生了您想要的输出,但您想知道 "is there a way, using RX, to solve this problem by NOT putting the splitting and processing in the same action"。
嗯,琐碎的,是的:
var result = new List<string[]>();
observable
.Select(x => x.Split(','))
.Select(x => x.Select(s => s.ToUpper()))
.Select(x => x.ToArray())
.Do(s => result.Add(s))
.Subscribe();
但是,这不会并行处理项目。 Rx 设计为串行工作,除非您调用引入并行性的操作。
通常,一种简单的方法是使用长运行宁select,例如.Select(x => longRunningOperation(x))
,然后用它来做:
.SelectMany(x => Observable.Start(() => longRunningOperation(x)))
对于您的情况,您可以先这样做:
observable
.ObserveOn(Scheduler.Default)
.SelectMany(x => Observable.Start(() => x.Split(',')))
.SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper())))
.SelectMany(x => Observable.Start(() => x.ToArray()))
.Do(s => result.Add(s))
.Subscribe();
但这只是并行化每个原始 .OnNext
调用,而不是其中的处理。为此,您需要将 x.Split(',')
的结果转换为可观察对象,并并行处理。
observable
.SelectMany(x => Observable.Start(() => x.Split(',').ToObservable()))
.SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper()))))
.SelectMany(x => Observable.Start(() => x.ToArray()))
.Do(s => s.Do(t => result.Add(t)))
.Merge()
.Subscribe();
但这开始看起来很疯狂,它不再 运行 在当前线程上,这意味着您的测试不会等待结果。
让我们重新看一下这个查询。
我已经开始摆脱 .Do
调用。这些通常有利于调试,但对于任何状态更改来说都是不好的。它们可以 运行 在查询中的任何线程上的任何时候,因此您需要确保 .Do
调用中的代码是线程安全的并且调用 result.Add(s)
是 不是线程安全的。
我还引入了一个 "webservice" 调用来用一秒的处理延迟替换 .ToUpper()
以便我们可以看到查询需要多长时间来处理,从而知道它是否 运行并联与否。如果最终查询需要 5 秒到 运行,那么就没有并行性,如果它更少,那么我们就赢了。
所以,如果我以最基本的方式编写查询,它看起来像这样:
Func<string, string> webservice = x =>
{
Thread.Sleep(1000);
return x.ToUpper();
};
var query =
observable
.Select(ls =>
from p in ls.Split(',')
select webservice(p))
.Select(rs => rs.ToArray())
.ToArray()
.Select(rss => new List<string[]>(rss));
var sw = Stopwatch.StartNew();
List<string[]> result = query.Wait();
sw.Stop();
当我 运行 这样做时,我得到了预期的结果 {{"A","B"},{"C","D","E"}}
,但完成只需要 5 秒多一点。此处没有预期的并行性。
现在让我们介绍一些并行性:
var query =
observable
.Select(ls =>
from p in ls.Split(',').ToObservable()
from r in Observable.Start(() => webservice(p))
select r)
.Select(rs => rs.ToArray())
.Merge()
.ToArray()
.Select(rss => new List<string[]>(rss));
我基本上已经将上面描述的“Select
应用于 SelectMany
/Start
” 模式。唯一棘手的部分是 .Select(rs => rs.ToArray())
从 IObservable<string[]>
变成了 IObservable<IObservable<string[]>>
所以我弹出 .Merge()
把它弄平了。当您将并行性引入 Rx 查询时,这是正常的。
现在,当我 运行 查询时 - BOOM - 刚好超过一秒钟。所有五个输入都 运行ning 并联。现在唯一的问题是顺序不再是决定性的。但是当结果并行执行时,你无能为力。
一个这样的 运行 我得到了这个结果:
如果我运行将此作为测试,我会将结果按已知顺序排序并将其与预期结果进行比较。