Rx 合并 + CombineLatest?
Rx Merge + CombineLatest?
CombineLatest
在两个 observable 都启动时启动。
A 1----------2---------------
B -----a----------b---c------
C -----1a----2a---2b--2c----- C = A.CombineLatest(B)
Merge
运算符在 A 或 B 启动时启动。但是,它不能合并 A 和 B 的最新值。
A 1----------2---------------
B -----a----------b---c------
C 1----a-----2----b---c------ C = A.Merge(B)
我需要一个像 Merge
一样的运算符,除了它允许我在两个可观察值都启动时组合 A 和 B 的最新值:
A 1----------2---------------
B -----a----------b---c------
C 1----1a----2a---2b--2c----- C = A.MergeOrCombineLatest(B)
它的签名可能是这样的:
Observable<C> MergeOrCombineLatest<A, B, C>(
this IObservable<A> a,
IObservable<B> b,
Func<A, C> aResultSelector, // When A starts before B
Func<B, C> bResultSelector, // When B starts before A
Func<A, B, C> bothResultSelector) // When both A and B have started
如何实现这个运算符?
首先,您为 A、B 选择您的 Observables 永远不会发出的特殊值,让它为空:
A specialA = null;
B specialB = null;
然后
Observable<C> MergeOrCombineLatest<A, B, C>(
this IObservable<A> a,
IObservable<B> b,
Func<A, C> aResultSelector, // When A starts before B
Func<B, C> bResultSelector, // When B starts before A
Func<A, B, C> bothResultSelector) // When both A and B have started
{
return a.StartWith(specialA).CombineLatest(b.StartWith(specialB),
(aval, bval) => {
if (aval == specialA) return bval == specialB ? default(C) : bResultSelector(bval);
if (bval == specialB) return aResultSelector(bval);
return bothResultSelector(aval, bval);
}
).skip(1); // skip the first emission where both are special values
}
这对我有用:
public static IObservable<C> MergeOrCombineLatest<A, B, C>(
this IObservable<A> a,
IObservable<B> b,
Func<A, C> aResultSelector, // When A starts before B
Func<B, C> bResultSelector, // When B starts before A
Func<A, B, C> bothResultSelector) // When both A and B have started
{
return
a.Publish(aa =>
b.Publish(bb =>
aa.CombineLatest(bb, bothResultSelector).Publish(xs =>
aa
.Select(aResultSelector)
.Merge(bb.Select(bResultSelector))
.TakeUntil(xs)
.SkipLast(1)
.Merge(xs))));
}
然后这个:
var a = new Subject<int>();
var b = new Subject<string>();
var C = a.MergeOrCombineLatest(b, x => $"{x}!!", y => $"{y}!!", (x, y) => $"{x}{y}");
C.Subscribe(x => Console.WriteLine(x));
b.OnNext("x");
b.OnNext("y");
b.OnNext("z");
a.OnNext(1);
a.OnNext(5);
a.OnNext(6);
b.OnNext("a");
a.OnNext(2);
b.OnNext("b");
b.OnNext("c");
...给出这个:
x!!
y!!
z!!
1z
5z
6z
6a
2a
2b
2c
CombineLatest
在两个 observable 都启动时启动。
A 1----------2---------------
B -----a----------b---c------
C -----1a----2a---2b--2c----- C = A.CombineLatest(B)
Merge
运算符在 A 或 B 启动时启动。但是,它不能合并 A 和 B 的最新值。
A 1----------2---------------
B -----a----------b---c------
C 1----a-----2----b---c------ C = A.Merge(B)
我需要一个像 Merge
一样的运算符,除了它允许我在两个可观察值都启动时组合 A 和 B 的最新值:
A 1----------2---------------
B -----a----------b---c------
C 1----1a----2a---2b--2c----- C = A.MergeOrCombineLatest(B)
它的签名可能是这样的:
Observable<C> MergeOrCombineLatest<A, B, C>(
this IObservable<A> a,
IObservable<B> b,
Func<A, C> aResultSelector, // When A starts before B
Func<B, C> bResultSelector, // When B starts before A
Func<A, B, C> bothResultSelector) // When both A and B have started
如何实现这个运算符?
首先,您为 A、B 选择您的 Observables 永远不会发出的特殊值,让它为空:
A specialA = null;
B specialB = null;
然后
Observable<C> MergeOrCombineLatest<A, B, C>(
this IObservable<A> a,
IObservable<B> b,
Func<A, C> aResultSelector, // When A starts before B
Func<B, C> bResultSelector, // When B starts before A
Func<A, B, C> bothResultSelector) // When both A and B have started
{
return a.StartWith(specialA).CombineLatest(b.StartWith(specialB),
(aval, bval) => {
if (aval == specialA) return bval == specialB ? default(C) : bResultSelector(bval);
if (bval == specialB) return aResultSelector(bval);
return bothResultSelector(aval, bval);
}
).skip(1); // skip the first emission where both are special values
}
这对我有用:
public static IObservable<C> MergeOrCombineLatest<A, B, C>(
this IObservable<A> a,
IObservable<B> b,
Func<A, C> aResultSelector, // When A starts before B
Func<B, C> bResultSelector, // When B starts before A
Func<A, B, C> bothResultSelector) // When both A and B have started
{
return
a.Publish(aa =>
b.Publish(bb =>
aa.CombineLatest(bb, bothResultSelector).Publish(xs =>
aa
.Select(aResultSelector)
.Merge(bb.Select(bResultSelector))
.TakeUntil(xs)
.SkipLast(1)
.Merge(xs))));
}
然后这个:
var a = new Subject<int>();
var b = new Subject<string>();
var C = a.MergeOrCombineLatest(b, x => $"{x}!!", y => $"{y}!!", (x, y) => $"{x}{y}");
C.Subscribe(x => Console.WriteLine(x));
b.OnNext("x");
b.OnNext("y");
b.OnNext("z");
a.OnNext(1);
a.OnNext(5);
a.OnNext(6);
b.OnNext("a");
a.OnNext(2);
b.OnNext("b");
b.OnNext("c");
...给出这个:
x!! y!! z!! 1z 5z 6z 6a 2a 2b 2c