合并多个流,保持排序并避免重复

Merging multiple streams, keeping ordering and avoiding duplicates

我有一个问题,我不知道如何用 RX 完美地处理。 我 有多个流 据说都包含相同的元素 但是每个流可能会丢失消息(涉及UDP)或者与其他流相比late/early。这些消息中的每一条都有一个序列号

现在我想要实现的是从所有这些流中获取一个流, 没有重复并保持消息顺序 。换句话说,同一个序列号不应该出现两次,它们的值只能增加,不能减少。 当一条消息在所有流上丢失时,我可以接受丢失它(因为涉及另一种 TCP 机制,允许我明确询问丢失的消息)。

我希望在 RxJava 中做到这一点,但我想我的问题并非特定于 Java。

这是一个弹珠图,可帮助直观显示我想要实现的目标: marble diagram

您可以在该图中看到我们正在等待第一个流上的 2 从第二个流输出 3。 同样,只有当我们从第二个流中收到 6 时,才会输出 6 因为只有在那时我们才能确定 5 永远不会被任何流接收。

不久前出现了一个类似的问题,I have a custom merge operator 当给定有序流时,它会按顺序合并它们,但不进行重复数据删除。

编辑:

如果你可以"afford"它,你可以使用这个自定义合并然后distinctUntilChanged(Func1)过滤掉具有相同序列号的后续消息。

Observable<Message> messages = SortedMerge.create(
    Arrays.asList(src1, src2, src3), (a, b) -> Long.compare(a.id, b.id))
.distinctUntilChanged(v -> v.id);

这是浏览器代码,但我认为它应该能让您很好地了解如何解决这个问题。

public static IObservable<T> Sequenced<T>(
    this IObservable<T> source,
    Func<T, int> getSequenceNumber,
    int sequenceBegin,
    int sequenceRedundancy)
{
    return Observable.Create(observer =>
    {
        // The next sequence number in order.
        var sequenceNext = sequenceBegin;

        // The key is the sequence number.
        // The value is (T, Count).
        var counts = new SortedDictionary<int, Tuple<T, int>>();
        return source.Subscribe(
            value =>
            {
                var sequenceNumber = getSequenceNumber(value);

                // If the sequence number for the current value is
                // earlier in the sequence, just throw away this value.
                if (sequenceNumber < sequenceNext)
                {
                    return;
                }

                // Update counts based on the current value.
                Tuple<T, int> count;
                if (!counts.TryGetValue(sequenceNumber, out count))
                {
                    count = Tuple.Create(value, 0);
                }
                count = Tuple.Create(count.Item1, count.Item2 + 1);
                counts[sequenceNumber] = count;

                // If the current count has reached sequenceRedundancy,
                // that means any seqeunce values S such that
                // sequenceNext < S < sequenceNumber and S has not been
                // seen yet will never be seen. So we emit everything
                // we have seen up to this point, in order.
                if (count.Item2 >= sequenceRedundancy)
                {
                    var removal = counts.Keys
                        .TakeWhile(seq => seq <= sequenceNumber)
                        .ToList();
                    foreach (var seq in removal)
                    {
                        count = counts[seq];
                        observer.OnNext(count.Item1);
                        counts.Remove(seq);
                    }
                    sequenceNext++;
                }

                // Emit stored values as long as we keep having the
                // next sequence value.
                while (counts.TryGetValue(sequenceNext, out count))
                {
                    observer.OnNext(count.Item1);
                    counts.Remove(sequenceNext);
                    sequenceNext++;
                }
            },
            observer.OnError,
            () =>
            {
                // Emit in order any remaining values.
                foreach (var count in counts.Values)
                {
                    observer.OnNext(count.Item1);
                }
                observer.OnCompleted();
            });
    });
}

如果您有两个流 IObservable<Message> AIObservable<Message> B,您可以通过 Observable.Merge(A, B).Sequenced(msg => msg.SequenceNumber, 1, 2).

使用它

对于您的弹珠图示例,它看起来如下所示,其中 source 列显示 Observable.Merge(A, B) 发出的值,counts 列显示 SortedDictionary算法的每一步之后。我假设原始源序列(没有任何丢失值)的 "messages" 是 (A,1), (B,2), (C,3), (D,4), (E,5 ), (F,6) 其中每条消息的第二个组成部分是其序列号。

source | counts
-------|-----------
 (A,1) | --> emit A
 (A,1) | --> skip
 (C,3) | (3,(C,1))
 (B,2) | (3,(C,1)) --> emit B,C and remove C
 (D,4) | --> emit D
 (F,6) | (6,(F,1))
 (F,6) | (6,(F,2)) --> emit F and remove