如果其他 Observable 发出映射函数,则转换 Observable

Transform Observable if other Observables emmited mapping function

我正在创建一个游戏,其中有一个可观察到的事件流 X,代表制造商交付的产品。还有一些外部事件(我们称它们为变形金刚)会以各种方式在不同的时间段影响制造的性能。我想用其他 observables 来表示,这些 observables 发出一个转换 X 的函数,并且应该应用于每个 X,直到 Transformer 的 OnComplete。变形金刚的数量事先未知 - 它们是由于用户操作(如设备购买)或随机生成(如设备故障)而创建的。

我想我需要一个 IObservable<IObservable<Func<X,X>>>,我必须 JoinZip,还有什么?)和 IObservable<X> 才能做到这一点。你能帮我吗? Observable.CombineLatest 几乎是我所需要的,但它需要 IEnumerable<IObservable<T>>.

如果我的描述不清楚,这里有一个弹珠图:

用更抽象的术语来说,我需要的非常类似于矩阵的转置,但我有 IObservable<IObservable<T>>.

而不是 List<List<T>>

将变形金刚表示为流是否有意义?

既然添加一个新的Transformer只能对未来的Event进行改造,为什么不维护一些活跃的Transformer集合,然后当有新的Event进来时,就可以应用所有当前的Transformer呢?

当 Transformer 不再活跃时,它会从集合中删除或标记为不活跃。

假设你的转换器在 int 上工作并且你的 observables 是这样命名的:

IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;

我会先将Observable of transformers的Observable转化为Array of Transformers的Observable,即

IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>

首先,我们最终要在列表中添加和删除函数,并确保删除正确的转换器,我们必须覆盖 Func<...> 上的常用比较机制。所以我们...

var transformerArrayObservable = transformerObservables
    // ...attach each transformer the index of the observable it came from:        
    .Select((transformerObservable, index) => transformerObservable
        .Select(transformer => Tuple.Create(index, transformer))
        // Then, materialize the transformer sequence so we get noticed when the sequence terminates.
        .Materialize()
        // Now the fun part: Make a scan, resulting in an observable of tuples
        // that have the previous and current transformer
        .Scan(new
        {
            Previous = (Tuple<int, Func<int, int>>)null,
            Current = (Tuple<int, Func<int, int>>)null
        },
        (tuple, currentTransformer) => new
        {
            Previous = tuple.Current,
            Current = currentTransformer.HasValue
                ? currentTransformer.Value
                : (Tuple<int, Func<int, int>>)null
        }))
        // Merge these and do another scan, this time adding and removing
        // the transformers from a list.
        .Merge()
        .Scan(
            new Tuple<int, Func<int, int>>[0],
            (array, tuple) =>
            {
                //Expensive! Consider taking a dependency on immutable collections here!
                var list = array.ToList();

                if (tuple.Previous != null)
                    list.Remove(tuple.Previous);

                if (tuple.Current != null)
                    list.Add(tuple.Current);

                return list.ToArray();
            })
            // Extract only the actual functions
        .Select(x => x.Select(y => y.Item2).ToArray())
        // Finally, to make sure that values are passed even when no transformer has been observed
        // start this sequence with the neutral transformation.
        // IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
        .StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});

现在,您将需要一个在 Rx 中不可用的运算符,称为 CombineVeryLatest。看看here.

var transformedValues = values
    .CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
    {
        return transformers
            .Aggregate(value, (current, transformer) => transformer(current));
    });

你应该完成了。我敢肯定,有一些性能可以获得,但你会明白的。

受到 this answer 的启发,我最终得到了这个:

        Output = Input
            .WithLatestFrom(
                transformations.Transpose(),
                (e, fs) => fs.Aggregate(e, (x, f) => f(x)))
            .SelectMany(x => x)
            .Publish();

其中 Transpose 和 WithLatestFrom 运算符定义为:

    public static IObservable<IObservable<T>> Transpose<T>(this IObservable<IObservable<T>> source)
    {
        return Observable.Create<IObservable<T>>(o =>
        {
            var latestValues = new Dictionary<IObservable<T>, T>();
            var result = new BehaviorSubject<IObservable<T>>(Observable.Empty<T>());

            source.Subscribe(observable =>
            {
                observable.Subscribe(t =>
                {
                    latestValues[observable] = t;
                    result.OnNext(latestValues.ToObservable().Select(kv => kv.Value));
                }, () =>
                {
                    latestValues.Remove(observable);
                });
            });

            return result.Subscribe(o);
        });
    }

    public static IObservable<R> WithLatestFrom<T, U, R>(
        this IObservable<T> source,
        IObservable<U> other,
        Func<T, U, R> combine)
    {
        return Observable.Create<R>(o =>
        {
            var current = new BehaviorSubject<U>(default(U));
            other.Subscribe(current);
            return source.Select(s => combine(s, current.Value)).Subscribe(o);
        });
    }

这是检查行为的单元测试:

    [TestMethod]
    public void WithLatestFrom_ShouldNotDuplicateEvents()
    {
        var events = new Subject<int>();

        var add1 = new Subject<Func<int, int>>();
        var add2 = new Subject<Func<int, int>>();
        var transforms = new Subject<IObservable<Func<int, int>>>();

        var results = new List<int>();

        events.WithLatestFrom(
                transforms.Transpose(),
                (e, fs) => fs.Aggregate(e, (x, f) => f(x)))
            .SelectMany(x => x)
            .Subscribe(results.Add);


        events.OnNext(1);
        transforms.OnNext(add1);
        add1.OnNext(x => x + 1);
        events.OnNext(1); // 1+1 = 2
        transforms.OnNext(add2);
        add2.OnNext(x => x + 2);
        events.OnNext(1); // 1+1+2 = 4
        add1.OnCompleted();
        events.OnNext(1); // 1+2 = 3
        add2.OnCompleted();
        events.OnNext(1);

        CollectionAssert.AreEqual(new int[] { 1, 2, 4, 3, 1 }, results);
    }

哇,这真是令人费解,但我想我有一些有用的东西。首先,我创建了一个扩展方法来将 IObservable<IObservable<Func<T, T>> 转换为 IObservable<IEnumerable<Func<T, T>>。扩展方法的运行假设每个可观察对象在完成之前只会产生一个 Func<T, T>

public static class MoreReactiveExtensions
{
    public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
    {
        return
            Observable
            // Yield an empty enumerable first.
            .Repeat(Enumerable.Empty<Func<T, T>>(), 1)
            // Then yield an updated enumerable every time one of 
            // the transformation observables yields a value or completes.
            .Concat(                                    
                source
                .SelectMany((x, i) => 
                    x
                    .Materialize()
                    .Select(y => new 
                        { 
                            Id = i, 
                            Notification = y 
                        }))
                .Scan(
                    new List<Tuple<int, Func<T, T>>>(),
                    (acc, x) => 
                    {
                        switch(x.Notification.Kind)
                        {
                            // If an observable compeleted then remove
                            // its corresponding function from the accumulator.
                            case NotificationKind.OnCompleted:
                                acc = 
                                    acc
                                    .Where(y => y.Item1 != x.Id)
                                    .ToList();
                                break;
                            // If an observable yield a new Func then add
                            // it to the accumulator.
                            case NotificationKind.OnNext:
                                acc = new List<Tuple<int, Func<T, T>>>(acc) 
                                    { 
                                        Tuple.Create(x.Id, x.Notification.Value) 
                                    };
                                break;
                            // Do something with exceptions here.
                            default:
                                // Do something here
                                break;
                        }
                        return acc;
                    })
                // Select an IEnumerable<Func<T, T>> here.
                .Select(x => x.Select(y => y.Item2)));
    }
}

然后,给定以下变量:

IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`

我是这样使用的:

var transformations =
    transformationObservables
    .ToTransformations()
    .Publish()
    .RefCount();

IObservable<int> transformedProducts=
    transformations
    .Join(
        products,
        t => transformations,
        i => Observable.Empty<int>(),
        (t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))

根据我的测试结果似乎是正确的。