我如何根据每个元素的条件循环可观察序列?
How can i loop the observable sequence based on condition from each element?
我有一个序列,我需要根据每个元素的条件重复它。例如,如果某个元素标有 "failed" 标志,我需要对其进行重新处理。我的问题是我找不到如何执行 while-loop
操作。
TakeWhile
几乎是我需要的,但不会重复。
/*
* The following lines are just an example to comprehend the idea
*/
var observableSequence = sequence.ToObservable();
observableSequence
//This ´DoWhile´ did not worked because does not accept each element as argument
//and sequence at this point is not the same as `observableSequence`
.DoWhile(() => sequence.Any(item => !item.Failed))
.Where(item => item.Failed == true) //OK here i could put another condition for limited retries...
.Subscribe(item => {
try{
//Do stuff...
//. . .
item.Failed = false;
} catch
{
item.Failed = true;
}
});
我建议将原始序列与一个新的 Observable 合并,当对象失败时将其输入。
var retries = new ReplaySubject<Foo>();
var loopSequence = sequence.ToObservable().Merge(retries);
loopSequence
.Where(item => item.Failed)
.Subscribe(item =>
{
try{
//Do stuff
item.Failed = false;
} catch
{
item.Failed = true;
}
retries.OnNext(item);
});
通常认为更改可观察对象中对象的状态是不好的做法,因此您可能需要考虑创建转换:
loopSequence
.Where(item => item.Failed)
.Subscribe(item =>
{
try{
//Do stuff
retries.OnNext(new Item { ..., Failed = false });
} catch
{
retries.OnNext(new Item { ..., Failed = true });
}
});
你也应该非常小心这个模式,因为一个不断失败的项目会让你的程序执行进入一种无限循环。
您可以包装每个值并独立重试操作:
observableSequence
.SelectMany(item =>
Observable.Return(item)
.Select(x => //Do Stuff)
//Optional argument, omitting retries infinitely until
//success
.Retry(3)
)
.Subscribe(item => {
//Handle the results
});
可选地 Return
将 IScheduler
作为其第二个参数,这可能会改变重试的处理顺序(递归与蹦床)。
我有一个序列,我需要根据每个元素的条件重复它。例如,如果某个元素标有 "failed" 标志,我需要对其进行重新处理。我的问题是我找不到如何执行 while-loop
操作。
TakeWhile
几乎是我需要的,但不会重复。
/*
* The following lines are just an example to comprehend the idea
*/
var observableSequence = sequence.ToObservable();
observableSequence
//This ´DoWhile´ did not worked because does not accept each element as argument
//and sequence at this point is not the same as `observableSequence`
.DoWhile(() => sequence.Any(item => !item.Failed))
.Where(item => item.Failed == true) //OK here i could put another condition for limited retries...
.Subscribe(item => {
try{
//Do stuff...
//. . .
item.Failed = false;
} catch
{
item.Failed = true;
}
});
我建议将原始序列与一个新的 Observable 合并,当对象失败时将其输入。
var retries = new ReplaySubject<Foo>();
var loopSequence = sequence.ToObservable().Merge(retries);
loopSequence
.Where(item => item.Failed)
.Subscribe(item =>
{
try{
//Do stuff
item.Failed = false;
} catch
{
item.Failed = true;
}
retries.OnNext(item);
});
通常认为更改可观察对象中对象的状态是不好的做法,因此您可能需要考虑创建转换:
loopSequence
.Where(item => item.Failed)
.Subscribe(item =>
{
try{
//Do stuff
retries.OnNext(new Item { ..., Failed = false });
} catch
{
retries.OnNext(new Item { ..., Failed = true });
}
});
你也应该非常小心这个模式,因为一个不断失败的项目会让你的程序执行进入一种无限循环。
您可以包装每个值并独立重试操作:
observableSequence
.SelectMany(item =>
Observable.Return(item)
.Select(x => //Do Stuff)
//Optional argument, omitting retries infinitely until
//success
.Retry(3)
)
.Subscribe(item => {
//Handle the results
});
可选地 Return
将 IScheduler
作为其第二个参数,这可能会改变重试的处理顺序(递归与蹦床)。