在 Reactive Extensions 中合并 Observables?
Merging Observables in Reactive Extensions?
刚学RX,想做一个迭代文件系统的程序。这是我想出的有效方法:
using System;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ConsoleApplication8
{
internal class Program
{
private static IObservable<string> GetFiles(string folder, string filePattern)
{
return Observable.Create<string>(
o =>
{
var files = Directory.GetFiles(folder, filePattern);
foreach (var file in files)
{
o.OnNext(file);
}
var folders = Directory.GetDirectories(folder);
foreach (var f in folders)
{
var x = GetFiles(f, filePattern);
x.Subscribe(p => { o.OnNext(p); });
}
o.OnCompleted();
return Disposable.Empty;
});
}
private static void Main(string[] args)
{
var o = GetFiles(@"d:\temp", "*.*");
o.Subscribe(p => { Console.WriteLine(p); });
Console.Read();
}
}
}
(注意递归的使用,再次调用GetFiles并订阅)
虽然它看起来很笨拙,但我不禁想到我应该使用像 Concat 这样的东西来组合序列,而不是仅仅将它们冒泡备份。
此外,我想将 Foreach 更改为 Parallel.ForEach,但我不确定使用 RX 会产生什么后果。我似乎找不到太多文档。
关于如何使用 RX 更好地编写此代码的任何提示?
原来,根据我的发现SearchOption.AllDirectories(为什么这么多年我从来没有注意到这一点)它可以归结为两行真正的代码:
using System;
using System.IO;
using System.Reactive.Linq;
namespace ConsoleApplication8
{
internal class Program
{
private static void Main(string[] args)
{
var o = Directory.GetFiles(@"e:\code", "*.*", SearchOption.AllDirectories).ToObservable();
o.Subscribe(f => Console.WriteLine(f));
Console.Read();
}
}
}
现在真是太简单了。需要一个新的 RX 问题来解决。
要解决这样的问题,先写一个LINQ版本的函数会有所帮助。例如:
static IEnumerable<string> GetFiles(string folder, string filePattern)
{
return Directory.GetFiles(folder, filePattern)
.Concat(Directory.GetDirectories(folder).SelectMany(f => GetFilesEnumerable(f, filePattern)));
}
然后只需将 IEnumerables 更改为 IObservables:
static IObservable<string> GetFiles(string folder, string filePattern)
{
return Directory.GetFiles(folder, filePattern).ToObservable()
.Concat(Directory.GetDirectories(folder).ToObservable().SelectMany(f => GetFilesEnumerable(f, filePattern)));
}
刚学RX,想做一个迭代文件系统的程序。这是我想出的有效方法:
using System;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ConsoleApplication8
{
internal class Program
{
private static IObservable<string> GetFiles(string folder, string filePattern)
{
return Observable.Create<string>(
o =>
{
var files = Directory.GetFiles(folder, filePattern);
foreach (var file in files)
{
o.OnNext(file);
}
var folders = Directory.GetDirectories(folder);
foreach (var f in folders)
{
var x = GetFiles(f, filePattern);
x.Subscribe(p => { o.OnNext(p); });
}
o.OnCompleted();
return Disposable.Empty;
});
}
private static void Main(string[] args)
{
var o = GetFiles(@"d:\temp", "*.*");
o.Subscribe(p => { Console.WriteLine(p); });
Console.Read();
}
}
}
(注意递归的使用,再次调用GetFiles并订阅)
虽然它看起来很笨拙,但我不禁想到我应该使用像 Concat 这样的东西来组合序列,而不是仅仅将它们冒泡备份。
此外,我想将 Foreach 更改为 Parallel.ForEach,但我不确定使用 RX 会产生什么后果。我似乎找不到太多文档。
关于如何使用 RX 更好地编写此代码的任何提示?
原来,根据我的发现SearchOption.AllDirectories(为什么这么多年我从来没有注意到这一点)它可以归结为两行真正的代码:
using System;
using System.IO;
using System.Reactive.Linq;
namespace ConsoleApplication8
{
internal class Program
{
private static void Main(string[] args)
{
var o = Directory.GetFiles(@"e:\code", "*.*", SearchOption.AllDirectories).ToObservable();
o.Subscribe(f => Console.WriteLine(f));
Console.Read();
}
}
}
现在真是太简单了。需要一个新的 RX 问题来解决。
要解决这样的问题,先写一个LINQ版本的函数会有所帮助。例如:
static IEnumerable<string> GetFiles(string folder, string filePattern)
{
return Directory.GetFiles(folder, filePattern)
.Concat(Directory.GetDirectories(folder).SelectMany(f => GetFilesEnumerable(f, filePattern)));
}
然后只需将 IEnumerables 更改为 IObservables:
static IObservable<string> GetFiles(string folder, string filePattern)
{
return Directory.GetFiles(folder, filePattern).ToObservable()
.Concat(Directory.GetDirectories(folder).ToObservable().SelectMany(f => GetFilesEnumerable(f, filePattern)));
}