如果其他 Observable 发出映射函数,则转换 Observable
Transform Observable if other Observables emmited mapping function
我正在创建一个游戏,其中有一个可观察到的事件流 X,代表制造商交付的产品。还有一些外部事件(我们称它们为变形金刚)会以各种方式在不同的时间段影响制造的性能。我想用其他 observables 来表示,这些 observables 发出一个转换 X 的函数,并且应该应用于每个 X,直到 Transformer 的 OnComplete。变形金刚的数量事先未知 - 它们是由于用户操作(如设备购买)或随机生成(如设备故障)而创建的。
我想我需要一个 IObservable<IObservable<Func<X,X>>>
,我必须 Join
(Zip
,还有什么?)和 IObservable<X>
才能做到这一点。你能帮我吗? Observable.CombineLatest
几乎是我所需要的,但它需要 IEnumerable<IObservable<T>>
.
如果我的描述不清楚,这里有一个弹珠图:
用更抽象的术语来说,我需要的非常类似于矩阵的转置,但我有 IObservable<IObservable<T>>
.
而不是 List<List<T>>
将变形金刚表示为流是否有意义?
既然添加一个新的Transformer只能对未来的Event进行改造,为什么不维护一些活跃的Transformer集合,然后当有新的Event进来时,就可以应用所有当前的Transformer呢?
当 Transformer 不再活跃时,它会从集合中删除或标记为不活跃。
假设你的转换器在 int
上工作并且你的 observables 是这样命名的:
IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;
我会先将Observable of transformers的Observable转化为Array of Transformers的Observable,即
IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>
首先,我们最终要在列表中添加和删除函数,并确保删除正确的转换器,我们必须覆盖 Func<...> 上的常用比较机制。所以我们...
var transformerArrayObservable = transformerObservables
// ...attach each transformer the index of the observable it came from:
.Select((transformerObservable, index) => transformerObservable
.Select(transformer => Tuple.Create(index, transformer))
// Then, materialize the transformer sequence so we get noticed when the sequence terminates.
.Materialize()
// Now the fun part: Make a scan, resulting in an observable of tuples
// that have the previous and current transformer
.Scan(new
{
Previous = (Tuple<int, Func<int, int>>)null,
Current = (Tuple<int, Func<int, int>>)null
},
(tuple, currentTransformer) => new
{
Previous = tuple.Current,
Current = currentTransformer.HasValue
? currentTransformer.Value
: (Tuple<int, Func<int, int>>)null
}))
// Merge these and do another scan, this time adding and removing
// the transformers from a list.
.Merge()
.Scan(
new Tuple<int, Func<int, int>>[0],
(array, tuple) =>
{
//Expensive! Consider taking a dependency on immutable collections here!
var list = array.ToList();
if (tuple.Previous != null)
list.Remove(tuple.Previous);
if (tuple.Current != null)
list.Add(tuple.Current);
return list.ToArray();
})
// Extract only the actual functions
.Select(x => x.Select(y => y.Item2).ToArray())
// Finally, to make sure that values are passed even when no transformer has been observed
// start this sequence with the neutral transformation.
// IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
.StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});
现在,您将需要一个在 Rx 中不可用的运算符,称为 CombineVeryLatest。看看here.
var transformedValues = values
.CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
{
return transformers
.Aggregate(value, (current, transformer) => transformer(current));
});
你应该完成了。我敢肯定,有一些性能可以获得,但你会明白的。
受到 this answer 的启发,我最终得到了这个:
Output = Input
.WithLatestFrom(
transformations.Transpose(),
(e, fs) => fs.Aggregate(e, (x, f) => f(x)))
.SelectMany(x => x)
.Publish();
其中 Transpose 和 WithLatestFrom 运算符定义为:
public static IObservable<IObservable<T>> Transpose<T>(this IObservable<IObservable<T>> source)
{
return Observable.Create<IObservable<T>>(o =>
{
var latestValues = new Dictionary<IObservable<T>, T>();
var result = new BehaviorSubject<IObservable<T>>(Observable.Empty<T>());
source.Subscribe(observable =>
{
observable.Subscribe(t =>
{
latestValues[observable] = t;
result.OnNext(latestValues.ToObservable().Select(kv => kv.Value));
}, () =>
{
latestValues.Remove(observable);
});
});
return result.Subscribe(o);
});
}
public static IObservable<R> WithLatestFrom<T, U, R>(
this IObservable<T> source,
IObservable<U> other,
Func<T, U, R> combine)
{
return Observable.Create<R>(o =>
{
var current = new BehaviorSubject<U>(default(U));
other.Subscribe(current);
return source.Select(s => combine(s, current.Value)).Subscribe(o);
});
}
这是检查行为的单元测试:
[TestMethod]
public void WithLatestFrom_ShouldNotDuplicateEvents()
{
var events = new Subject<int>();
var add1 = new Subject<Func<int, int>>();
var add2 = new Subject<Func<int, int>>();
var transforms = new Subject<IObservable<Func<int, int>>>();
var results = new List<int>();
events.WithLatestFrom(
transforms.Transpose(),
(e, fs) => fs.Aggregate(e, (x, f) => f(x)))
.SelectMany(x => x)
.Subscribe(results.Add);
events.OnNext(1);
transforms.OnNext(add1);
add1.OnNext(x => x + 1);
events.OnNext(1); // 1+1 = 2
transforms.OnNext(add2);
add2.OnNext(x => x + 2);
events.OnNext(1); // 1+1+2 = 4
add1.OnCompleted();
events.OnNext(1); // 1+2 = 3
add2.OnCompleted();
events.OnNext(1);
CollectionAssert.AreEqual(new int[] { 1, 2, 4, 3, 1 }, results);
}
哇,这真是令人费解,但我想我有一些有用的东西。首先,我创建了一个扩展方法来将 IObservable<IObservable<Func<T, T>>
转换为 IObservable<IEnumerable<Func<T, T>>
。扩展方法的运行假设每个可观察对象在完成之前只会产生一个 Func<T, T>
。
public static class MoreReactiveExtensions
{
public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
{
return
Observable
// Yield an empty enumerable first.
.Repeat(Enumerable.Empty<Func<T, T>>(), 1)
// Then yield an updated enumerable every time one of
// the transformation observables yields a value or completes.
.Concat(
source
.SelectMany((x, i) =>
x
.Materialize()
.Select(y => new
{
Id = i,
Notification = y
}))
.Scan(
new List<Tuple<int, Func<T, T>>>(),
(acc, x) =>
{
switch(x.Notification.Kind)
{
// If an observable compeleted then remove
// its corresponding function from the accumulator.
case NotificationKind.OnCompleted:
acc =
acc
.Where(y => y.Item1 != x.Id)
.ToList();
break;
// If an observable yield a new Func then add
// it to the accumulator.
case NotificationKind.OnNext:
acc = new List<Tuple<int, Func<T, T>>>(acc)
{
Tuple.Create(x.Id, x.Notification.Value)
};
break;
// Do something with exceptions here.
default:
// Do something here
break;
}
return acc;
})
// Select an IEnumerable<Func<T, T>> here.
.Select(x => x.Select(y => y.Item2)));
}
}
然后,给定以下变量:
IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`
我是这样使用的:
var transformations =
transformationObservables
.ToTransformations()
.Publish()
.RefCount();
IObservable<int> transformedProducts=
transformations
.Join(
products,
t => transformations,
i => Observable.Empty<int>(),
(t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))
根据我的测试结果似乎是正确的。
我正在创建一个游戏,其中有一个可观察到的事件流 X,代表制造商交付的产品。还有一些外部事件(我们称它们为变形金刚)会以各种方式在不同的时间段影响制造的性能。我想用其他 observables 来表示,这些 observables 发出一个转换 X 的函数,并且应该应用于每个 X,直到 Transformer 的 OnComplete。变形金刚的数量事先未知 - 它们是由于用户操作(如设备购买)或随机生成(如设备故障)而创建的。
我想我需要一个 IObservable<IObservable<Func<X,X>>>
,我必须 Join
(Zip
,还有什么?)和 IObservable<X>
才能做到这一点。你能帮我吗? Observable.CombineLatest
几乎是我所需要的,但它需要 IEnumerable<IObservable<T>>
.
如果我的描述不清楚,这里有一个弹珠图:
用更抽象的术语来说,我需要的非常类似于矩阵的转置,但我有 IObservable<IObservable<T>>
.
List<List<T>>
将变形金刚表示为流是否有意义?
既然添加一个新的Transformer只能对未来的Event进行改造,为什么不维护一些活跃的Transformer集合,然后当有新的Event进来时,就可以应用所有当前的Transformer呢?
当 Transformer 不再活跃时,它会从集合中删除或标记为不活跃。
假设你的转换器在 int
上工作并且你的 observables 是这样命名的:
IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;
我会先将Observable of transformers的Observable转化为Array of Transformers的Observable,即
IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>
首先,我们最终要在列表中添加和删除函数,并确保删除正确的转换器,我们必须覆盖 Func<...> 上的常用比较机制。所以我们...
var transformerArrayObservable = transformerObservables
// ...attach each transformer the index of the observable it came from:
.Select((transformerObservable, index) => transformerObservable
.Select(transformer => Tuple.Create(index, transformer))
// Then, materialize the transformer sequence so we get noticed when the sequence terminates.
.Materialize()
// Now the fun part: Make a scan, resulting in an observable of tuples
// that have the previous and current transformer
.Scan(new
{
Previous = (Tuple<int, Func<int, int>>)null,
Current = (Tuple<int, Func<int, int>>)null
},
(tuple, currentTransformer) => new
{
Previous = tuple.Current,
Current = currentTransformer.HasValue
? currentTransformer.Value
: (Tuple<int, Func<int, int>>)null
}))
// Merge these and do another scan, this time adding and removing
// the transformers from a list.
.Merge()
.Scan(
new Tuple<int, Func<int, int>>[0],
(array, tuple) =>
{
//Expensive! Consider taking a dependency on immutable collections here!
var list = array.ToList();
if (tuple.Previous != null)
list.Remove(tuple.Previous);
if (tuple.Current != null)
list.Add(tuple.Current);
return list.ToArray();
})
// Extract only the actual functions
.Select(x => x.Select(y => y.Item2).ToArray())
// Finally, to make sure that values are passed even when no transformer has been observed
// start this sequence with the neutral transformation.
// IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
.StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});
现在,您将需要一个在 Rx 中不可用的运算符,称为 CombineVeryLatest。看看here.
var transformedValues = values
.CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
{
return transformers
.Aggregate(value, (current, transformer) => transformer(current));
});
你应该完成了。我敢肯定,有一些性能可以获得,但你会明白的。
受到 this answer 的启发,我最终得到了这个:
Output = Input
.WithLatestFrom(
transformations.Transpose(),
(e, fs) => fs.Aggregate(e, (x, f) => f(x)))
.SelectMany(x => x)
.Publish();
其中 Transpose 和 WithLatestFrom 运算符定义为:
public static IObservable<IObservable<T>> Transpose<T>(this IObservable<IObservable<T>> source)
{
return Observable.Create<IObservable<T>>(o =>
{
var latestValues = new Dictionary<IObservable<T>, T>();
var result = new BehaviorSubject<IObservable<T>>(Observable.Empty<T>());
source.Subscribe(observable =>
{
observable.Subscribe(t =>
{
latestValues[observable] = t;
result.OnNext(latestValues.ToObservable().Select(kv => kv.Value));
}, () =>
{
latestValues.Remove(observable);
});
});
return result.Subscribe(o);
});
}
public static IObservable<R> WithLatestFrom<T, U, R>(
this IObservable<T> source,
IObservable<U> other,
Func<T, U, R> combine)
{
return Observable.Create<R>(o =>
{
var current = new BehaviorSubject<U>(default(U));
other.Subscribe(current);
return source.Select(s => combine(s, current.Value)).Subscribe(o);
});
}
这是检查行为的单元测试:
[TestMethod]
public void WithLatestFrom_ShouldNotDuplicateEvents()
{
var events = new Subject<int>();
var add1 = new Subject<Func<int, int>>();
var add2 = new Subject<Func<int, int>>();
var transforms = new Subject<IObservable<Func<int, int>>>();
var results = new List<int>();
events.WithLatestFrom(
transforms.Transpose(),
(e, fs) => fs.Aggregate(e, (x, f) => f(x)))
.SelectMany(x => x)
.Subscribe(results.Add);
events.OnNext(1);
transforms.OnNext(add1);
add1.OnNext(x => x + 1);
events.OnNext(1); // 1+1 = 2
transforms.OnNext(add2);
add2.OnNext(x => x + 2);
events.OnNext(1); // 1+1+2 = 4
add1.OnCompleted();
events.OnNext(1); // 1+2 = 3
add2.OnCompleted();
events.OnNext(1);
CollectionAssert.AreEqual(new int[] { 1, 2, 4, 3, 1 }, results);
}
哇,这真是令人费解,但我想我有一些有用的东西。首先,我创建了一个扩展方法来将 IObservable<IObservable<Func<T, T>>
转换为 IObservable<IEnumerable<Func<T, T>>
。扩展方法的运行假设每个可观察对象在完成之前只会产生一个 Func<T, T>
。
public static class MoreReactiveExtensions
{
public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
{
return
Observable
// Yield an empty enumerable first.
.Repeat(Enumerable.Empty<Func<T, T>>(), 1)
// Then yield an updated enumerable every time one of
// the transformation observables yields a value or completes.
.Concat(
source
.SelectMany((x, i) =>
x
.Materialize()
.Select(y => new
{
Id = i,
Notification = y
}))
.Scan(
new List<Tuple<int, Func<T, T>>>(),
(acc, x) =>
{
switch(x.Notification.Kind)
{
// If an observable compeleted then remove
// its corresponding function from the accumulator.
case NotificationKind.OnCompleted:
acc =
acc
.Where(y => y.Item1 != x.Id)
.ToList();
break;
// If an observable yield a new Func then add
// it to the accumulator.
case NotificationKind.OnNext:
acc = new List<Tuple<int, Func<T, T>>>(acc)
{
Tuple.Create(x.Id, x.Notification.Value)
};
break;
// Do something with exceptions here.
default:
// Do something here
break;
}
return acc;
})
// Select an IEnumerable<Func<T, T>> here.
.Select(x => x.Select(y => y.Item2)));
}
}
然后,给定以下变量:
IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`
我是这样使用的:
var transformations =
transformationObservables
.ToTransformations()
.Publish()
.RefCount();
IObservable<int> transformedProducts=
transformations
.Join(
products,
t => transformations,
i => Observable.Empty<int>(),
(t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))
根据我的测试结果似乎是正确的。