Rx.NET 生成可取消的可观察文件名的方法是什么?
What is the Rx.NET way to produce a cancellable observable of file names?
我想生成一个文件的可观察对象,以便可以随时取消文件名的发现。对于此示例,取消将在 1 秒后自动进行。
这是我当前的代码:
class Program
{
static void Main()
{
try
{
RunAsync(@"\abc\xyz").GetAwaiter().GetResult();
}
catch (Exception exc)
{
Console.Error.WriteLine(exc);
}
Console.Write("Press Enter to exit");
Console.ReadLine();
}
private static async Task RunAsync(string path)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await GetFileSource(path, cts);
}
private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
{
return Observable.Create<string>(obs => Task.Run(async () =>
{
Console.WriteLine("Inside Before");
foreach (var file in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).Take(50))
{
cts.Token.ThrowIfCancellationRequested();
obs.OnNext(file);
await Task.Delay(100);
}
Console.WriteLine("Inside After");
obs.OnCompleted();
return Disposable.Empty;
}, cts.Token))
.Do(Console.WriteLine);
}
}
我不喜欢我的实现的两个方面(如果有更多 - 请随时指出):
- 我有很多文件,但我手动遍历了每个文件。我能以某种方式使用
ToObservable
扩展名吗?
- 我不知道如何使用传递给
Task.Run
的 cts.Token
。必须使用从外部上下文(GetFileSource
参数)捕获的 cts
。我觉得很丑。
这是应该做的吗?一定是更好的方法。
我建议您在可以使用其他运算符时避免使用 Observable.Create
。
此外,当您在 Observable.Create
中执行 return Disposable.Empty;
时,您正在创建一个不能被普通 Rx 订阅一次性停止的可观察对象。这会导致内存泄漏和不必要的处理。
最后,抛出异常来结束正常计算是个坏主意。
有一个很好的清洁解决方案似乎可以满足您的需求:
private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
{
return
Directory
.EnumerateFiles(path, "*", SearchOption.AllDirectories)
.ToObservable()
.Take(50)
.TakeWhile(f => !cts.IsCancellationRequested);
}
我唯一没有包括的是 Task.Delay(100);
。你为什么要这样做?
我仍然不相信这真的是一个反应性问题,你要求对生产者施加背压,这实际上与反应性的工作方式背道而驰。
也就是说,如果你打算这样做,你应该意识到非常细粒度的时间操作应该几乎总是委托给 Scheduler
而不是试图与 [=13 进行协调=] 和 CancellationTokens
。所以我会重构成这样:
public static IObservable<string> GetFileSource(string path, Func<string, Task<string>> processor, IScheduler scheduler = null) {
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(obs =>
{
//Grab the enumerator as our iteration state.
var enumerator = Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories)
.GetEnumerator();
return scheduler.Schedule(enumerator, async (e, recurse) =>
{
if (!e.MoveNext())
{
obs.OnCompleted();
return;
}
//Wait here until processing is done before moving on
obs.OnNext(await processor(e.Current));
//Recursively schedule
recurse(e);
});
});
}
然后,不传递取消令牌,而是使用 TakeUntil
:
var source = GetFileSource(path, x => {/*Do some async task here*/; return x; })
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1));
您还可以查看 .
实现的更高级示例
我想生成一个文件的可观察对象,以便可以随时取消文件名的发现。对于此示例,取消将在 1 秒后自动进行。
这是我当前的代码:
class Program
{
static void Main()
{
try
{
RunAsync(@"\abc\xyz").GetAwaiter().GetResult();
}
catch (Exception exc)
{
Console.Error.WriteLine(exc);
}
Console.Write("Press Enter to exit");
Console.ReadLine();
}
private static async Task RunAsync(string path)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await GetFileSource(path, cts);
}
private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
{
return Observable.Create<string>(obs => Task.Run(async () =>
{
Console.WriteLine("Inside Before");
foreach (var file in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).Take(50))
{
cts.Token.ThrowIfCancellationRequested();
obs.OnNext(file);
await Task.Delay(100);
}
Console.WriteLine("Inside After");
obs.OnCompleted();
return Disposable.Empty;
}, cts.Token))
.Do(Console.WriteLine);
}
}
我不喜欢我的实现的两个方面(如果有更多 - 请随时指出):
- 我有很多文件,但我手动遍历了每个文件。我能以某种方式使用
ToObservable
扩展名吗? - 我不知道如何使用传递给
Task.Run
的cts.Token
。必须使用从外部上下文(GetFileSource
参数)捕获的cts
。我觉得很丑。
这是应该做的吗?一定是更好的方法。
我建议您在可以使用其他运算符时避免使用 Observable.Create
。
此外,当您在 Observable.Create
中执行 return Disposable.Empty;
时,您正在创建一个不能被普通 Rx 订阅一次性停止的可观察对象。这会导致内存泄漏和不必要的处理。
最后,抛出异常来结束正常计算是个坏主意。
有一个很好的清洁解决方案似乎可以满足您的需求:
private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
{
return
Directory
.EnumerateFiles(path, "*", SearchOption.AllDirectories)
.ToObservable()
.Take(50)
.TakeWhile(f => !cts.IsCancellationRequested);
}
我唯一没有包括的是 Task.Delay(100);
。你为什么要这样做?
我仍然不相信这真的是一个反应性问题,你要求对生产者施加背压,这实际上与反应性的工作方式背道而驰。
也就是说,如果你打算这样做,你应该意识到非常细粒度的时间操作应该几乎总是委托给 Scheduler
而不是试图与 [=13 进行协调=] 和 CancellationTokens
。所以我会重构成这样:
public static IObservable<string> GetFileSource(string path, Func<string, Task<string>> processor, IScheduler scheduler = null) {
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(obs =>
{
//Grab the enumerator as our iteration state.
var enumerator = Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories)
.GetEnumerator();
return scheduler.Schedule(enumerator, async (e, recurse) =>
{
if (!e.MoveNext())
{
obs.OnCompleted();
return;
}
//Wait here until processing is done before moving on
obs.OnNext(await processor(e.Current));
//Recursively schedule
recurse(e);
});
});
}
然后,不传递取消令牌,而是使用 TakeUntil
:
var source = GetFileSource(path, x => {/*Do some async task here*/; return x; })
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1));
您还可以查看