Observable.Range 被重复?
Observable.Range being repeated?
Rx 的新手 -- 我有一个序列似乎正常运行,除了它似乎重复这一事实。
我想我在调用 Select()
或 SelectMany()
时遗漏了一些触发范围重新评估的东西。
代码说明和我想做什么
- 对于所有数字,循环检索数据(从数据库分页)的方法。
- 最终,此数据将为空(我只想在检索数据时继续处理
- 对于检索到的每条记录,我只想处理应该处理的记录
- 在应该处理的那些中,我想并行处理最多
x
个(根据设置)。
- 我想等到整个序列完成后退出该方法(因此在最后调用 wait)。
下面的代码有问题
- 我 运行 代码通过一个数据集,我知道只有 1 个项目。
- 因此,第 0 页 return 有 1 个项目,第 1 页 return 有 0 个项目。
- 我的期望是该过程 运行 对一个项目进行一次。
- 但是,我看到页面 0 和 1 都被调用了两次,因此该过程 运行s 两次。
我认为这与导致范围从 0 开始重新计算的调用有关,但我不知道它是什么。
代码
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();
这是因为您订阅了两次序列。一次在 query.Subscribe(...)
,一次在 query.Wait()
。
Observable.Range(0, int.MaxValue)
是冷可观察量。每次订阅,都会重新评价。您可以通过使用 Publish()
发布它来使 observable 变热,然后订阅它,然后 Connect()
然后 Wait()
。如果在最后一个元素已经生成后调用 Wait()
,这确实会增加获得 InvalidOperationException
的风险。更好的选择是 LastOrDefaultAsync()
.
那会给你这样的东西:
var connectable = query.Publish();
var subscription = connectable.Subscribe(...);
subscription = new CompositeDisposable(connectable.Connect(), subscription);
await connectable.LastOrDefaultAsync();
或者您可以直接使用 ToTask()
避免等待和 return 任务(请从您的方法签名中删除异步)。
return connectable.LastOrDefaultAsync().ToTask();
转换为任务后,可以用Wait()
同步等待(不要混淆Task.Wait()
和Observable.Wait()
)。
connectable.LastOrDefaultAsync().ToTask().Wait();
但是,您很可能根本不想等待!在异步上下文中等待毫无意义。你应该做什么它把序列完成后需要 运行 的剩余代码放在订阅的 OnComplete()
部分。如果您有(清理)代码需要 运行,即使您取消订阅(处置),请考虑 Observable.Using
或 Finally(...)
方法以确保此代码是 运行。
如前所述,Observable.Range
重复的原因是您订阅了两次 - 一次是 .Subscribe(...)
,一次是 .Wait()
。
在这种情况下,我会使用非常简单的阻塞调用来获取值。只需这样做:
var results = query.ToArray().Wait();
.ToArray()
将多值 IObservable<T>
变成单值 IObservable<T[]>
。 .Wait()
将其变为 T[]
。这是确保只有一个订阅、阻止和获取所有值的简单方法。
在您的情况下,您可能不需要所有值,但我认为这是养成的好习惯。
Rx 的新手 -- 我有一个序列似乎正常运行,除了它似乎重复这一事实。
我想我在调用 Select()
或 SelectMany()
时遗漏了一些触发范围重新评估的东西。
代码说明和我想做什么
- 对于所有数字,循环检索数据(从数据库分页)的方法。
- 最终,此数据将为空(我只想在检索数据时继续处理
- 对于检索到的每条记录,我只想处理应该处理的记录
- 在应该处理的那些中,我想并行处理最多
x
个(根据设置)。 - 我想等到整个序列完成后退出该方法(因此在最后调用 wait)。
下面的代码有问题
- 我 运行 代码通过一个数据集,我知道只有 1 个项目。
- 因此,第 0 页 return 有 1 个项目,第 1 页 return 有 0 个项目。
- 我的期望是该过程 运行 对一个项目进行一次。
- 但是,我看到页面 0 和 1 都被调用了两次,因此该过程 运行s 两次。
我认为这与导致范围从 0 开始重新计算的调用有关,但我不知道它是什么。
代码
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();
这是因为您订阅了两次序列。一次在 query.Subscribe(...)
,一次在 query.Wait()
。
Observable.Range(0, int.MaxValue)
是冷可观察量。每次订阅,都会重新评价。您可以通过使用 Publish()
发布它来使 observable 变热,然后订阅它,然后 Connect()
然后 Wait()
。如果在最后一个元素已经生成后调用 Wait()
,这确实会增加获得 InvalidOperationException
的风险。更好的选择是 LastOrDefaultAsync()
.
那会给你这样的东西:
var connectable = query.Publish();
var subscription = connectable.Subscribe(...);
subscription = new CompositeDisposable(connectable.Connect(), subscription);
await connectable.LastOrDefaultAsync();
或者您可以直接使用 ToTask()
避免等待和 return 任务(请从您的方法签名中删除异步)。
return connectable.LastOrDefaultAsync().ToTask();
转换为任务后,可以用Wait()
同步等待(不要混淆Task.Wait()
和Observable.Wait()
)。
connectable.LastOrDefaultAsync().ToTask().Wait();
但是,您很可能根本不想等待!在异步上下文中等待毫无意义。你应该做什么它把序列完成后需要 运行 的剩余代码放在订阅的 OnComplete()
部分。如果您有(清理)代码需要 运行,即使您取消订阅(处置),请考虑 Observable.Using
或 Finally(...)
方法以确保此代码是 运行。
如前所述,Observable.Range
重复的原因是您订阅了两次 - 一次是 .Subscribe(...)
,一次是 .Wait()
。
在这种情况下,我会使用非常简单的阻塞调用来获取值。只需这样做:
var results = query.ToArray().Wait();
.ToArray()
将多值 IObservable<T>
变成单值 IObservable<T[]>
。 .Wait()
将其变为 T[]
。这是确保只有一个订阅、阻止和获取所有值的简单方法。
在您的情况下,您可能不需要所有值,但我认为这是养成的好习惯。