Xamarin 中的 SubscribeOn / ObserveOn 示例 Android

Example with SubscribeOn / ObserveOn in Xamarin Android

我正在尝试在 Andorid Xamarin 中使用 RX。
你知道怎么用吗:
Observable.SubscribeOn(..) 声明使用默认池中的线程来执行后台任务 Observable.ObserveOn(..) 将事件从 Observable 重定向到 UI thread

根据 SubscribeOn 我尝试了所有的 ISchedulers 来自
System.Reactive.PlatformServices.dll / System.Reactive.Concurrency
即:

NewThreadScheduler.Default
TaskPoolScheduler.Default
ThreadPoolScheduler.Instance

没有任何效果。

另一方面[如果可能]我不想手动:
- 在 Observable
中创建我的线程 - 在 Observer

中使用 RunOnUiThread

===更新===

NewThreadScheduler.Default
的测试结果 代码:

Console.WriteLine("creating ole");
var ole = Observable.Create<string>(suber =>
{
   Console.WriteLine("inside ole");
   Thread.Sleep(5000);
   suber.OnNext("point1");
   suber.OnCompleted();
   Console.WriteLine("ole completed");
   return Disposable.Create(() => Console.WriteLine("observer unsubscribed"));
});
ole.SubscribeOn(NewThreadScheduler.Default);
Console.WriteLine("subscribing");
oleSub = ole.Subscribe(s => Console.WriteLine("result: " + s));
Console.WriteLine("subscribed");

输出:

creating ole
subscribing
inside ole
result: point1
ole completed
observer unsubscribed
subscribed

结论:
可观察内容在主线程中执行,尽管预计 运行 它自己的线程:ole.SubscribeOn(NewThreadScheduler.Default);

Rx 基于函数式编程,其中一个关键租户是无副作用编程。

当您创建 var ole = Observable.Create... 时,您正在创建一个可观察序列。当有东西订阅它时,这个序列会有行为。

当您随后尝试在 ole.SubscribeOn(NewThreadScheduler.Default); 上设置订阅时,您正在使用 SubscribeOn 行为来装饰 ole 可观察序列,但是因为这是一个无副作用操作,它 returns 一个新的可观察序列。您不会将返回的实例分配给任何东西。即 ole.SubscribeOn(NewThreadScheduler.Default); 行代码什么都不做。

然后您返回原始未修改的 ole 可观察序列并订阅它。

我有两个建议

  1. 创建一个 Logging 辅助运算符以消除代码中的所有 Console.Write 噪音。这对您在调试 Rx 代码时很有用,还可以让您学习如何创建 Rx 运算符(没有副作用)。 https://github.com/LeeCampbell/RxCookbook/blob/34ac4f3536b00bbe259384d3bf0e8746da3311cc/Instrumentation/Logging.md
  2. 链接你的方法,这样你就能真正得到你想要的行为。

这里我们恰当地使用了SubscribeOn方法。我们还使用 Log 扩展方法,您可以从上面的 link 中自己编写。

var ole = Observable.Create<string>(obs=>
{       
    Thread.Sleep(5000); //Dont use Thread.Sleep and Rx :-)
    obs.OnNext("point1");
    obs.OnCompleted();
    return Disposable.Empty;
});
var oleSubscription = ole
    .Log("ole")
    .SubscribeOn(NewThreadScheduler.Default)
    .ObserveOn(/*What ever Android's UI Thread Scheduler is*/)
    .Subscribe(s => Console.WriteLine("result: " + s));

另请注意,我也添加了 ObserveOn 运算符。我强烈建议这两种方法的用户仅在最终订阅者(可能是您的 ViewModel?)上使用它们,并且仅将它们用作上述 Subscribe 方法之前的最后一个运算符。

更多帮助: