文件监控系统响应式编程
File Monitoring System Reactive Programming
我正在使用 C#。我是响应式编程的新手。
使用反应式编程,我想创建一个文件夹监控系统,如果文件夹 A 包含任何文件,如果是,它将调用该系统,然后它将获取该文件并处理它并将其移动到文件夹 B 中。
比方说,文件夹 A 是空的 first.User 将一些文件实时添加到文件夹 A 中。系统检测到有新文件被添加,它会一个一个或同时处理它。
我无法理解我应该使用 Create 或 Interval 什么,然后我的处理代码将写在哪里
请帮助我
这应该相当接近:
var query =
Observable
.Using(
() =>
{
var fsw = new FileSystemWatcher(@"C:\A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h,
h => fsw.Created -= h))
.Delay(TimeSpan.FromSeconds(0.1));
query
.Subscribe(x => File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name)));
FileSystemWatcher
有一个比较小InternalBufferSize
(8 KB by default, 64 KB max), that can be easily exceeded if a burst of file system changes happens in a short time span, and the event handlers of the FileSystemWatcher
are doing anything time consuming. The documentation给出这样的建议:
Keep your event handling code as short as possible.
超出缓冲区的后果很严重:所有缓冲的通知都将丢失。在大多数情况下,这应该是非常不受欢迎的,如果不是完全不能接受的话。因此,要避免在事件调用的同一线程上同步执行繁重的文件移动操作。实现所需异步的一种简单方法是在处理程序和订阅代码之间注入 Delay
。一种更复杂的方法是将传入的通知排队,并按顺序或以有限的并发处理每个文件。 Merge
运算符可用于排队和并发控制。这是一个例子¹:
IObservable<Unit> query = Observable
.Using(() =>
{
var fsw = new FileSystemWatcher(@"C:\A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler,
FileSystemEventArgs>(h => fsw.Created += h, h => fsw.Created -= h)
)
.Delay(TimeSpan.FromSeconds(0.1))
.Select(x => Observable.Defer(() => Observable.Start(() =>
{
File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name));
})))
.Merge(maxConcurrent: 2);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
Task<Unit> task = query.ToTask(cts.Token); // Start the file-watching
Observable.Defer
+Observable.Start
组合用作异步 Observable.FromAsync
的同步等价物(因为 File.Move
方法是同步的)。
¹ 它是 Enigmativity 的 的修改版本。
我正在使用 C#。我是响应式编程的新手。 使用反应式编程,我想创建一个文件夹监控系统,如果文件夹 A 包含任何文件,如果是,它将调用该系统,然后它将获取该文件并处理它并将其移动到文件夹 B 中。 比方说,文件夹 A 是空的 first.User 将一些文件实时添加到文件夹 A 中。系统检测到有新文件被添加,它会一个一个或同时处理它。 我无法理解我应该使用 Create 或 Interval 什么,然后我的处理代码将写在哪里 请帮助我
这应该相当接近:
var query =
Observable
.Using(
() =>
{
var fsw = new FileSystemWatcher(@"C:\A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h,
h => fsw.Created -= h))
.Delay(TimeSpan.FromSeconds(0.1));
query
.Subscribe(x => File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name)));
FileSystemWatcher
有一个比较小InternalBufferSize
(8 KB by default, 64 KB max), that can be easily exceeded if a burst of file system changes happens in a short time span, and the event handlers of the FileSystemWatcher
are doing anything time consuming. The documentation给出这样的建议:
Keep your event handling code as short as possible.
超出缓冲区的后果很严重:所有缓冲的通知都将丢失。在大多数情况下,这应该是非常不受欢迎的,如果不是完全不能接受的话。因此,要避免在事件调用的同一线程上同步执行繁重的文件移动操作。实现所需异步的一种简单方法是在处理程序和订阅代码之间注入 Delay
。一种更复杂的方法是将传入的通知排队,并按顺序或以有限的并发处理每个文件。 Merge
运算符可用于排队和并发控制。这是一个例子¹:
IObservable<Unit> query = Observable
.Using(() =>
{
var fsw = new FileSystemWatcher(@"C:\A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler,
FileSystemEventArgs>(h => fsw.Created += h, h => fsw.Created -= h)
)
.Delay(TimeSpan.FromSeconds(0.1))
.Select(x => Observable.Defer(() => Observable.Start(() =>
{
File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name));
})))
.Merge(maxConcurrent: 2);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
Task<Unit> task = query.ToTask(cts.Token); // Start the file-watching
Observable.Defer
+Observable.Start
组合用作异步 Observable.FromAsync
的同步等价物(因为 File.Move
方法是同步的)。
¹ 它是 Enigmativity 的