Observable.Range 被重复?

Observable.Range being repeated?

Rx 的新手 -- 我有一个序列似乎正常运行,除了它似乎重复这一事实。

我想我在调用 Select()SelectMany() 时遗漏了一些触发范围重新评估的东西。

代码说明和我想做什么

下面的代码有问题

我认为这与导致范围从 0 开始重新计算的调用有关,但我不知道它是什么。

代码

var query = Observable.Range(0, int.MaxValue)
    .Select(pageNum =>
        {
            _etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
            return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
        })
    .TakeWhile(resProfList => resProfList.Any())
    .SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
    .Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
    .Merge(maxConcurrent: _processorSettings.ParallelProperties)
    .Do(async trackingRequests =>
    {
        await CreateRequests(trackingRequests.Result, createTrackingPayload);

        var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
        var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
        var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
            TrackingRecordRequestType.UpdateAssignmentType);

        _etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
            numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
    });

var subscription = query.Subscribe(
trackingRequests =>
{
    //Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
    _etlLogger.Info("Finished! Woohoo!");
});

await query.Wait();

这是因为您订阅了两次序列。一次在 query.Subscribe(...),一次在 query.Wait()

Observable.Range(0, int.MaxValue) 是冷可观察量。每次订阅,都会重新评价。您可以通过使用 Publish() 发布它来使 observable 变热,然后订阅它,然后 Connect() 然后 Wait()。如果在最后一个元素已经生成后调用 Wait(),这确实会增加获得 InvalidOperationException 的风险。更好的选择是 LastOrDefaultAsync().

那会给你这样的东西:

var connectable = query.Publish();
var subscription = connectable.Subscribe(...);
subscription = new CompositeDisposable(connectable.Connect(), subscription);
await connectable.LastOrDefaultAsync();

或者您可以直接使用 ToTask() 避免等待和 return 任务(请从您的方法签名中删除异步)。

return connectable.LastOrDefaultAsync().ToTask();

转换为任务后,可以用Wait()同步等待(不要混淆Task.Wait()Observable.Wait())。

connectable.LastOrDefaultAsync().ToTask().Wait();

但是,您很可能根本不想等待!在异步上下文中等待毫无意义。你应该做什么它把序列完成后需要 运行 的剩余代码放在订阅的 OnComplete() 部分。如果您有(清理)代码需要 运行,即使您取消订阅(处置),请考虑 Observable.UsingFinally(...) 方法以确保此代码是 运行。

如前所述,Observable.Range 重复的原因是您订阅了两次 - 一次是 .Subscribe(...),一次是 .Wait()

在这种情况下,我会使用非常简单的阻塞调用来获取值。只需这样做:

var results = query.ToArray().Wait();

.ToArray() 将多值 IObservable<T> 变成单值 IObservable<T[]>.Wait() 将其变为 T[]。这是确保只有一个订阅、阻止和获取所有值的简单方法。

在您的情况下,您可能不需要所有值,但我认为这是养成的好习惯。