文件监控系统响应式编程

File Monitoring System Reactive Programming

我正在使用 C#。我是响应式编程的新手。 使用反应式编程,我想创建一个文件夹监控系统,如果文件夹 A 包含任何文件,如果是,它将调用该系统,然后它将获取该文件并处理它并将其移动到文件夹 B 中。 比方说,文件夹 A 是空的 first.User 将一些文件实时添加到文件夹 A 中。系统检测到有新文件被添加,它会一个一个或同时处理它。 我无法理解我应该使用 Create 或 Interval 什么,然后我的处理代码将写在哪里 请帮助我

这应该相当接近:

var query =
    Observable
        .Using(
            () =>
            {
                var fsw = new FileSystemWatcher(@"C:\A");
                fsw.EnableRaisingEvents = true;
                return fsw;
            },
            fsw => Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                h => fsw.Created += h,
                h => fsw.Created -= h))
        .Delay(TimeSpan.FromSeconds(0.1));


query
    .Subscribe(x => File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name)));

FileSystemWatcher有一个比较小InternalBufferSize (8 KB by default, 64 KB max), that can be easily exceeded if a burst of file system changes happens in a short time span, and the event handlers of the FileSystemWatcher are doing anything time consuming. The documentation给出这样的建议:

Keep your event handling code as short as possible.

超出缓冲区的后果很严重:所有缓冲的通知都将丢失。在大多数情况下,这应该是非常不受欢迎的,如果不是完全不能接受的话。因此,要避免在事件调用的同一线程上同步执行繁重的文件移动操作。实现所需异步的一种简单方法是在处理程序和订阅代码之间注入 Delay。一种更复杂的方法是将传入的通知排队,并按顺序或以有限的并发处理每个文件。 Merge 运算符可用于排队和并发控制。这是一个例子¹:

IObservable<Unit> query = Observable
    .Using(() =>
        {
            var fsw = new FileSystemWatcher(@"C:\A");
            fsw.EnableRaisingEvents = true;
            return fsw;
        },
        fsw => Observable.FromEventPattern<FileSystemEventHandler,
            FileSystemEventArgs>(h => fsw.Created += h, h => fsw.Created -= h)
    )
    .Delay(TimeSpan.FromSeconds(0.1))
    .Select(x => Observable.Defer(() => Observable.Start(() =>
    {
        File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name));
    })))
    .Merge(maxConcurrent: 2);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

Task<Unit> task = query.ToTask(cts.Token); // Start the file-watching

Observable.Defer+Observable.Start 组合用作异步 Observable.FromAsync 的同步等价物(因为 File.Move 方法是同步的)。

¹ 它是 Enigmativity 的 的修改版本。