ToObservable 在某些情况下不会触发

ToObservable is not firing in some cases

RX默认是同步的所以我们可以确认一下

            int j = 0;
            Observable.Range(1, 2)
                .SelectMany(i => {
                    return new[]{1}.ToObservable()
                            .Select(i1 => {
                                new[]{1}.ToObservable().Subscribe(i2 => j = 1);
                                return 0;
                            })
                        ;
                })
                .Subscribe();
            j.ShouldBe(1);

但是在我的代码库中,我有一个类似的查询,除非我使用立即调度程序,否则它不会触发。

 public static IObservable<GitHubIssue> Save(this IObservable<IReadOnlyList<Issue>> source,  IGitHubRepository repository){
            var objectSpace = repository.ObjectSpace;
                return source.SelectMany(list => list.ToObservable().Select(issue => {
                    var gitHubIssue = objectSpace.CreateObject<GitHubIssue>();
                    gitHubIssue.Id = issue.Id;
                    issue.Labels.ToObservable(Scheduler.Immediate).Select(label => {
                        var gitHubLabel =objectSpace.GetObjectsQuery<GitHubLabel>(true).FirstOrDefault(_ => label.Name == _.Name) ??
                                         objectSpace.NewGitHubLabel(label);
                        gitHubIssue.Labels.Add(gitHubLabel);
                        return gitHubLabel;
                    }).Subscribe();
                    //previous selector is not executed 

我看不出其中的关系以及为什么会这样

"RX is synchronous by default" - 不,不是。每个运算符都有自己的默认值。

Observable.Range为例。这是当您不提供 Scheduler:

时的实现
public virtual IObservable<int> Range(int start, int count)
{
    return Range_(start, count, SchedulerDefaults.Iteration);
}

依次使用:

internal static IScheduler Iteration
{
    get
    {
        return CurrentThreadScheduler.Instance;
    }
}

如果我把Observable.Timer作为对位,我有这个代码:

public virtual IObservable<long> Timer(TimeSpan dueTime)
{
    return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
}

其中使用:

internal static IScheduler TimeBasedOperations
{
    get
    {
        return DefaultScheduler.Instance;
    }
}

您必须输入 .ToObservable(Scheduler.Immediate) 的事实表示您有一个默认情况下不使用 Scheduler.Immediate 的运算符。

现在,忽略所有这些,你永远不应该在一个可观察的管道内做的是订阅另一个可观察的。永远不能。当您这样做时,您依赖于副作用,这就是您的代码中出现的问题。

您应该始终假设对 Subscribe 的任何调用都会在将来的某个时间运行,因此即使您断言 j.ShouldBe(1) 也不应该在 Subscribe 之后使用。

您的示例代码应该更像这样:

int j = 0;
Observable
    .Range(1, 2)
    .SelectMany(i =>
    {
        return
            new[] { 1 }
                .ToObservable()
                .Select(i1 =>
                {
                    return 1;
                })
            ;
    })
    .Subscribe(x => 
    {
        j = x;
        /* j only valid here */
    });

/* j NOT valid here */

副作用的理性具体例子:

int j = 0;
Observable
    .Delay(Observable.Return(42), TimeSpan.FromSeconds(2.0))
    .Do(x => j = x)
    .Subscribe();
Console.WriteLine(j);

最终 j 将等于 42,但调用 Console.WriteLine(j) 时则不然。永远不要依赖在可观察对象内更新的可观察对象之外的状态。