ToObservable 在某些情况下不会触发
ToObservable is not firing in some cases
RX默认是同步的所以我们可以确认一下
int j = 0;
Observable.Range(1, 2)
.SelectMany(i => {
return new[]{1}.ToObservable()
.Select(i1 => {
new[]{1}.ToObservable().Subscribe(i2 => j = 1);
return 0;
})
;
})
.Subscribe();
j.ShouldBe(1);
但是在我的代码库中,我有一个类似的查询,除非我使用立即调度程序,否则它不会触发。
public static IObservable<GitHubIssue> Save(this IObservable<IReadOnlyList<Issue>> source, IGitHubRepository repository){
var objectSpace = repository.ObjectSpace;
return source.SelectMany(list => list.ToObservable().Select(issue => {
var gitHubIssue = objectSpace.CreateObject<GitHubIssue>();
gitHubIssue.Id = issue.Id;
issue.Labels.ToObservable(Scheduler.Immediate).Select(label => {
var gitHubLabel =objectSpace.GetObjectsQuery<GitHubLabel>(true).FirstOrDefault(_ => label.Name == _.Name) ??
objectSpace.NewGitHubLabel(label);
gitHubIssue.Labels.Add(gitHubLabel);
return gitHubLabel;
}).Subscribe();
//previous selector is not executed
我看不出其中的关系以及为什么会这样
"RX is synchronous by default" - 不,不是。每个运算符都有自己的默认值。
以Observable.Range
为例。这是当您不提供 Scheduler
:
时的实现
public virtual IObservable<int> Range(int start, int count)
{
return Range_(start, count, SchedulerDefaults.Iteration);
}
依次使用:
internal static IScheduler Iteration
{
get
{
return CurrentThreadScheduler.Instance;
}
}
如果我把Observable.Timer
作为对位,我有这个代码:
public virtual IObservable<long> Timer(TimeSpan dueTime)
{
return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
}
其中使用:
internal static IScheduler TimeBasedOperations
{
get
{
return DefaultScheduler.Instance;
}
}
您必须输入 .ToObservable(Scheduler.Immediate)
的事实表示您有一个默认情况下不使用 Scheduler.Immediate
的运算符。
现在,忽略所有这些,你永远不应该在一个可观察的管道内做的是订阅另一个可观察的。永远不能。当您这样做时,您依赖于副作用,这就是您的代码中出现的问题。
您应该始终假设对 Subscribe
的任何调用都会在将来的某个时间运行,因此即使您断言 j.ShouldBe(1)
也不应该在 Subscribe
之后使用。
您的示例代码应该更像这样:
int j = 0;
Observable
.Range(1, 2)
.SelectMany(i =>
{
return
new[] { 1 }
.ToObservable()
.Select(i1 =>
{
return 1;
})
;
})
.Subscribe(x =>
{
j = x;
/* j only valid here */
});
/* j NOT valid here */
副作用的理性具体例子:
int j = 0;
Observable
.Delay(Observable.Return(42), TimeSpan.FromSeconds(2.0))
.Do(x => j = x)
.Subscribe();
Console.WriteLine(j);
最终 j
将等于 42
,但调用 Console.WriteLine(j)
时则不然。永远不要依赖在可观察对象内更新的可观察对象之外的状态。
RX默认是同步的所以我们可以确认一下
int j = 0;
Observable.Range(1, 2)
.SelectMany(i => {
return new[]{1}.ToObservable()
.Select(i1 => {
new[]{1}.ToObservable().Subscribe(i2 => j = 1);
return 0;
})
;
})
.Subscribe();
j.ShouldBe(1);
但是在我的代码库中,我有一个类似的查询,除非我使用立即调度程序,否则它不会触发。
public static IObservable<GitHubIssue> Save(this IObservable<IReadOnlyList<Issue>> source, IGitHubRepository repository){
var objectSpace = repository.ObjectSpace;
return source.SelectMany(list => list.ToObservable().Select(issue => {
var gitHubIssue = objectSpace.CreateObject<GitHubIssue>();
gitHubIssue.Id = issue.Id;
issue.Labels.ToObservable(Scheduler.Immediate).Select(label => {
var gitHubLabel =objectSpace.GetObjectsQuery<GitHubLabel>(true).FirstOrDefault(_ => label.Name == _.Name) ??
objectSpace.NewGitHubLabel(label);
gitHubIssue.Labels.Add(gitHubLabel);
return gitHubLabel;
}).Subscribe();
//previous selector is not executed
我看不出其中的关系以及为什么会这样
"RX is synchronous by default" - 不,不是。每个运算符都有自己的默认值。
以Observable.Range
为例。这是当您不提供 Scheduler
:
public virtual IObservable<int> Range(int start, int count)
{
return Range_(start, count, SchedulerDefaults.Iteration);
}
依次使用:
internal static IScheduler Iteration
{
get
{
return CurrentThreadScheduler.Instance;
}
}
如果我把Observable.Timer
作为对位,我有这个代码:
public virtual IObservable<long> Timer(TimeSpan dueTime)
{
return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
}
其中使用:
internal static IScheduler TimeBasedOperations
{
get
{
return DefaultScheduler.Instance;
}
}
您必须输入 .ToObservable(Scheduler.Immediate)
的事实表示您有一个默认情况下不使用 Scheduler.Immediate
的运算符。
现在,忽略所有这些,你永远不应该在一个可观察的管道内做的是订阅另一个可观察的。永远不能。当您这样做时,您依赖于副作用,这就是您的代码中出现的问题。
您应该始终假设对 Subscribe
的任何调用都会在将来的某个时间运行,因此即使您断言 j.ShouldBe(1)
也不应该在 Subscribe
之后使用。
您的示例代码应该更像这样:
int j = 0;
Observable
.Range(1, 2)
.SelectMany(i =>
{
return
new[] { 1 }
.ToObservable()
.Select(i1 =>
{
return 1;
})
;
})
.Subscribe(x =>
{
j = x;
/* j only valid here */
});
/* j NOT valid here */
副作用的理性具体例子:
int j = 0;
Observable
.Delay(Observable.Return(42), TimeSpan.FromSeconds(2.0))
.Do(x => j = x)
.Subscribe();
Console.WriteLine(j);
最终 j
将等于 42
,但调用 Console.WriteLine(j)
时则不然。永远不要依赖在可观察对象内更新的可观察对象之外的状态。