如何同步 Observables 并卸载 UI 线程

How to synchronize Observables and offload UI Thread

我有两个订阅相同源的简单观察处理程序。然而,两种订阅都在不同的类型上运行。我希望他们保持可观察源 (Subject()) 的顺序。我尝试使用 Synchronize() 扩展,但我没有找到一种方法来按预期完成这项工作。

这是我的单元测试代码:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o; // just to simulate longer processing
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
                    o =>
                    {
                        Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                        Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
                    },
                    () =>
                    {
                        Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                    }))
    {
        Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        source.OnNext(1);
        source.OnNext(1.1);
        source.OnNext(2);
        source.OnNext(2.1);
        source.OnNext(3);
        source.OnNext(3.1);
        source.OnCompleted();

        Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        are.WaitOne();
    }
}

测试代码的结果输出:

Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled  1 on threadId: 11
Received 1,1 on threadId:12
Handled  1,1 on threadId: 12
Received 2,1 on threadId:12
Handled  2,1 on threadId: 12
Received 3,1 on threadId:12
Handled  3,1 on threadId: 12
Received 2 on threadId:11
Handled  2 on threadId: 11
OnCompleted on threadId:12
Received 3 on threadId:11
Handled  3 on threadId: 11
OnCompleted on threadId:11

如您所见,顺序与输入不同。我想同步两个订阅,以便顺序与输入相同。

输出应该是

Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled  1 on threadId: 11
Received 1,1 on threadId:12
Handled  1,1 on threadId: 12
Received 2 on threadId:11
Handled  2 on threadId: 11
Received 2,1 on threadId:12
Handled  2,1 on threadId: 12
Received 3 on threadId:11
Handled  3 on threadId: 11
Received 3,1 on threadId:12
Handled  3,1 on threadId: 12
OnCompleted on threadId:11
OnCompleted on threadId:12

(完成顺序对我来说没那么重要)

编辑:

我还尝试了以下方法:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
    var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler);
    var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory);
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o;
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
                    o =>
                    {
                        Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                        Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
                    },
                    () =>
                    {
                        Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                        are.Set();
                    }))
    {
        Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        source.OnNext(1);
        source.OnNext(1.1);
        source.OnNext(2);
        source.OnNext(2.1);
        source.OnNext(3);
        source.OnNext(3.1);
        source.OnCompleted();

        Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        are.WaitOne();
        are.WaitOne();
    }
}

但是输出还是错误的:

Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:4
Handled  1 on threadId: 4
Received 2 on threadId:4
Handled  2 on threadId: 4
Received 3 on threadId:4
Handled  3 on threadId: 4
OnCompleted on threadId:4
Received 1,1 on threadId:4
Handled  1,1 on threadId: 4
Received 2,1 on threadId:4
Handled  2,1 on threadId: 4
Received 3,1 on threadId:4
Handled  3,1 on threadId: 4
OnCompleted on threadId:4

...如您所见,它不符合 OnNext() 调用的顺序。

这在使用具有类似创建含义的类型然后进行多次更新时尤其重要...如果更新在创建之前怎么办?如果不能保证顺序,您可能会遇到问题或需要排队 "future" 事件,直到它们的前任与要更改的状态同步。 您需要类似递增的 version/order 数字来将其用作排序标准并找到 "holes" 并将后继者排队,直到他们再次排队。

第二次编辑 ...更接近我的问题并摆脱测试用例理论:

我想要一个易于使用 RX 过滤功能的简单界面:

public interface ICommandBus // or to say Aggregator pattern
{
    void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command

    IObservable<T> Stream<T>() where T : ICommand;
}

public class CommandBus : ICommandBus, IDisposable
{
    private static readonly ILog Log = LogManager.GetLogger<CommandBus>();

    private readonly HashSet<Type> registrations = new HashSet<Type>();

    private readonly Subject<ICommand> stream = new Subject<ICommand>();

    private readonly IObservable<ICommand> notifications;

    private bool disposed;

    public CommandBus()
    {
        // hmm, this is a problem!? how to sync?
        this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default);

    }

    public IObservable<T> Stream<T>() where T : ICommand
    {
        var observable = this.notifications.OfType<T>();
        return new ExclusiveObservableWrapper<T>(
            observable,
            t => this.registrations.Add(t),
            t => this.registrations.Remove(t));
    }

    public void Send<T>(T command) where T : ICommand
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }

        if (!this.registrations.Contains(typeof(T)))
        {
            throw new NoCommandHandlerSubscribedException();
        }

        Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name));

        this.stream.OnNext(command);
    }

    //public async Task SendAsync<T>(T command) where T : ICommand
    //{
    //    if (command == null)
    //    {
    //        throw new ArgumentNullException("command");
    //    }

    //    if (!this.registrations.Contains(typeof(T)))
    //    {
    //        throw new NoCommandHandlerSubscribedException();
    //    }

    //    Log.Debug(logm => logm("Sending command of type {0}.", typeof(T)));

    //    this.stream.OnNext(command);

    //    await this.stream.Where(item => ReferenceEquals(item, command));
    //}

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
            {
                this.stream.Dispose();
            }
        }

        this.disposed = true;
    }

    [Serializable]
    public class CommandAlreadySubscribedException : Exception
    {
        internal CommandAlreadySubscribedException(Type type)
            : base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type))
        {
        }

        protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }
    }

    [Serializable]
    public class NoCommandHandlerSubscribedException : Exception
    {
        public NoCommandHandlerSubscribedException()
        {
        }

        public NoCommandHandlerSubscribedException(string message)
            : base(message)
        {
        }

        public NoCommandHandlerSubscribedException(string message, Exception innerException)
            : base(message, innerException)
        {
        }

        protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }
    }

    private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand
    {
        private readonly IObservable<T> observable;

        private readonly Func<Type, bool> register;

        private readonly Action<Type> unregister;

        internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister)
        {
            this.observable = observable;
            this.register = register;
            this.unregister = unregister;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var subscription = this.observable.Subscribe(observer);
            var type = typeof(T);

            if (!this.register(type))
            {
                observer.OnError(new CommandAlreadySubscribedException(type));
            }

            return Disposable.Create(
                () =>
                {
                    subscription.Dispose();
                    this.unregister(type);
                });
        }
    }
}

如果我不能保证这些命令是按给定的顺序排列的,那么它们(可能)就没有意义。 (创建前更新)

ICommandBus 用于 UI/Presentation 层,它希望为命令调用相应的处理程序(无需知道处理程序)。

我想简单地将链卸载到一个单独的线程。

命令 -> 总线 -> 命令处理程序 -> 域模型 -> 事件 -> 事件处理程序 -> 读取模型

这需要保持命令出现的顺序。

我认为 RX 只需一些 "magic lines" 就能做到这一点。但据我所知,我现在必须用自己的线程处理再做一次。 :-(

您根据来源下一个成员的类型筛选,为 source 创建了两个不同的任务。

并行处理消息,正如您从线程 ID 中看到的那样。这为您提供了更好的性能,但无法保证 处理 source 的顺序 。因此,如果您需要对象的顺序句柄,则必须重写代码以进行顺序执行(这会降低性能)或使用其他调度程序进行测试。

目前您正在使用 TaskPoolScheduler.Default,它只使用默认线程池。因此,您可以提供 a new scheduler. You can provide it a new implementation by yourself, but I think that the most easiest way is to use ConcurrentExclusiveSchedulerPair class 来提供一个独占调度程序来处理您的 source,其顺序与您提供值的顺序相同。

您的代码可能是这样的:

var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler );
var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory);
using (source.ObserveOn(exclusiveScheduler)...

更新:

正如其他人 post 所说,处理此类事件的正确方法是 EventLoopScheduler class.

你好像对.Synchronize()的作用理解有误。它的唯一目的是获取一个产生重叠或错位消息的可观察对象(即 OnCompletedOnNext 或多个 OnError 之前)并确保它们遵循 OnNext*(OnError|OnCompleted) 行为契约。这是关于让流氓 observable 玩得很好。

现在,既然我们可以忽略它,因为您的示例输入是一个行为良好的可观察对象,那么您可以通过调用 .ObserveOn(TaskPoolScheduler.Default) 看到您正在创建可观察的跳转线程——这很容易导致可观察对象被以不同的速率消耗 - 这就是这里发生的情况。

您已经订阅了 source 两次,因此您无法停止在引入并发的情况下看到的行为。

鉴于您之前的问题 (),您似乎一心想使用 Rx 来添加并发性,但随后又以某种方式强行将其删除。你真的应该以释放 Rx 的心态去做它的事情而不是高贵它。

@Beachwalker 编辑:

Enigmativity 在对这个答案的评论中给出了我的问题的正确答案

我必须使用EventLoopScheduler。所以我接受这个作为正确答案。

为了完整性。这是有效的代码:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var exclusiveScheduler = new EventLoopScheduler();
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o;
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
                    o =>
                        {
                            Console.WriteLine(
                                "Received {1} on threadId:{0}",
                                Thread.CurrentThread.ManagedThreadId,
                                o);
                            Console.WriteLine(
                                "Handled  {1} on threadId: {0}",
                                Thread.CurrentThread.ManagedThreadId,
                                o);
                        },
                    () =>
                    {
                        Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                        are.Set();
                    }))
    {
        Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        source.OnNext(1);
        source.OnNext(1.1);
        source.OnNext(2);
        source.OnNext(2.1);
        source.OnNext(3);
        source.OnNext(3.1);
        source.OnCompleted();

        Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        are.WaitOne();
        are.WaitOne();
    }
}