如何使用任务并行库中的 Task.Run() 启动冷可观察对象?

How to start a cold observable using Task.Run() from the task parallel library?

我们有这样一种情况,我们想要在 C# 应用程序中启动一个后台 "polling" 操作,该应用程序 returns 定期使用反应式扩展值。我们要实施的流程如下:

  1. 一个调用者调用一个类似 Poll() 的方法,该方法 returns 一个 IObservable
  2. 调用者订阅所述可观察对象,并启动一个后台 thread/task 与硬件交互以在某个时间间隔检索值
  3. 当调用者完成后,它会处理订阅并自动停止后台 thread/task

尝试 #1

为了证明这一点,我编写了以下控制台应用程序,但这并不符合我的预期:

public class OutputParameters
{
    public Guid Id { get; set; }
    public int Value { get; set; }
}

public class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Requesting the polling operation");
        var worker1 = Poll();

        Console.WriteLine("Subscribing to start the polling operation");

        var sub1 = worker1.Subscribe(
            value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
            ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
            () => { Console.WriteLine("Thread has completed"); });


        Thread.Sleep(5000);

        sub1.Dispose();

        Console.ReadLine();
    }


    private static IObservable<OutputParameters> Poll()
    {
        return Observable.DeferAsync(Worker);
    }


    private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        Task.Run(async () =>
        {
            var id = Guid.NewGuid();
            const int steps = 10;

            try
            {
                for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
                {
                    Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
                    subject.OnNext(new OutputParameters { Id = id, Value = i });

                    // This will actually throw an exception if it's the active call when
                    //  the token is cancelled.
                    //
                    await Task.Delay(1000, token);
                }
            }
            catch (TaskCanceledException ex)
            {
                // Interestingly, if this is triggered because the caller unsibscribed then
                //  this is unneeded...the caller isn't listening for this error anymore
                //
                subject.OnError(ex);
            }

            if (token.IsCancellationRequested)
            {
                Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
            }
            else
            {
                Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
                subject.OnCompleted();
            }
        }, token);

        return Task.FromResult(subject.AsObservable());
    }
}

上面的代码实际上似乎几乎立即取消了后台任务,因为这是输出:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled

尝试#2

然后我尝试对 Worker 方法做一个小改动,使其异步并等待 Task.Run 调用,如下所示:

    private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        await Task.Run(async () =>
        {
            ...what happens in here is unchanged...
        }, token);

        return subject.AsObservable();
    }

虽然这里的结果让后台任务看起来具有完全控制权,因为它在被取消之前 运行 持续了大约 5 秒,但是订阅回调没有输出。这是完整的输出:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled

我的问题

所以很明显我没有完全理解这里发生的事情,或者在这种情况下使用 DeferAsync 是 observable 的正确创建方法。

有没有合适的方法来实现这种方法?

如果仅 RX 解决方案就足够了,这就可以了。如果你问我更干净...

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<Guid>(() => Observable.Return(Guid.NewGuid()))
        .SelectMany(id => 
            Observable.Generate(1, i => i <= steps, i => i + 1, i => i, _ => TimeSpan.FromMilliseconds(1000))
                .ObserveOn(new EventLoopScheduler())
                .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
                .Select(i => new OutputParameters { Id = id, Value = i })
        );
}

解释:

  • Generate 就像 Rx 的 for 循环。最后一个参数控制何时发出项目。这相当于你的 for 循环 + Task.Delay.
  • ObserveOn控制where/whenobservable被观察到。在这种情况下,EventLoopScheduler 将为每个订阅者启动一个新线程,并且来自该可观察对象的所有项目都将在新线程上被观察到。

来自 Enigmativity:

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<OutputParameters>(() =>
    {
        var id = Guid.NewGuid();
        return Observable.Generate(1, i => i <= steps, i => i + 1, i => i,
                _ => TimeSpan.FromMilliseconds(1000), new EventLoopScheduler())
            .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
            .Select(i => new OutputParameters { Id = id, Value = i });
    });
}