合并多个流,保持排序并避免重复
Merging multiple streams, keeping ordering and avoiding duplicates
我有一个问题,我不知道如何用 RX 完美地处理。
我 有多个流 据说都包含相同的元素
但是每个流可能会丢失消息(涉及UDP)或者与其他流相比late/early。这些消息中的每一条都有一个序列号。
现在我想要实现的是从所有这些流中获取一个流, 没有重复并保持消息顺序 。换句话说,同一个序列号不应该出现两次,它们的值只能增加,不能减少。
当一条消息在所有流上丢失时,我可以接受丢失它(因为涉及另一种 TCP 机制,允许我明确询问丢失的消息)。
我希望在 RxJava 中做到这一点,但我想我的问题并非特定于 Java。
这是一个弹珠图,可帮助直观显示我想要实现的目标:
marble diagram
您可以在该图中看到我们正在等待第一个流上的 2 从第二个流输出 3。
同样,只有当我们从第二个流中收到 6 时,才会输出 6 因为只有在那时我们才能确定 5 永远不会被任何流接收。
不久前出现了一个类似的问题,I have a custom merge operator 当给定有序流时,它会按顺序合并它们,但不进行重复数据删除。
编辑:
如果你可以"afford"它,你可以使用这个自定义合并然后distinctUntilChanged(Func1)
过滤掉具有相同序列号的后续消息。
Observable<Message> messages = SortedMerge.create(
Arrays.asList(src1, src2, src3), (a, b) -> Long.compare(a.id, b.id))
.distinctUntilChanged(v -> v.id);
这是浏览器代码,但我认为它应该能让您很好地了解如何解决这个问题。
public static IObservable<T> Sequenced<T>(
this IObservable<T> source,
Func<T, int> getSequenceNumber,
int sequenceBegin,
int sequenceRedundancy)
{
return Observable.Create(observer =>
{
// The next sequence number in order.
var sequenceNext = sequenceBegin;
// The key is the sequence number.
// The value is (T, Count).
var counts = new SortedDictionary<int, Tuple<T, int>>();
return source.Subscribe(
value =>
{
var sequenceNumber = getSequenceNumber(value);
// If the sequence number for the current value is
// earlier in the sequence, just throw away this value.
if (sequenceNumber < sequenceNext)
{
return;
}
// Update counts based on the current value.
Tuple<T, int> count;
if (!counts.TryGetValue(sequenceNumber, out count))
{
count = Tuple.Create(value, 0);
}
count = Tuple.Create(count.Item1, count.Item2 + 1);
counts[sequenceNumber] = count;
// If the current count has reached sequenceRedundancy,
// that means any seqeunce values S such that
// sequenceNext < S < sequenceNumber and S has not been
// seen yet will never be seen. So we emit everything
// we have seen up to this point, in order.
if (count.Item2 >= sequenceRedundancy)
{
var removal = counts.Keys
.TakeWhile(seq => seq <= sequenceNumber)
.ToList();
foreach (var seq in removal)
{
count = counts[seq];
observer.OnNext(count.Item1);
counts.Remove(seq);
}
sequenceNext++;
}
// Emit stored values as long as we keep having the
// next sequence value.
while (counts.TryGetValue(sequenceNext, out count))
{
observer.OnNext(count.Item1);
counts.Remove(sequenceNext);
sequenceNext++;
}
},
observer.OnError,
() =>
{
// Emit in order any remaining values.
foreach (var count in counts.Values)
{
observer.OnNext(count.Item1);
}
observer.OnCompleted();
});
});
}
如果您有两个流 IObservable<Message> A
和 IObservable<Message> B
,您可以通过 Observable.Merge(A, B).Sequenced(msg => msg.SequenceNumber, 1, 2)
.
使用它
对于您的弹珠图示例,它看起来如下所示,其中 source
列显示 Observable.Merge(A, B)
发出的值,counts
列显示 SortedDictionary
算法的每一步之后。我假设原始源序列(没有任何丢失值)的 "messages" 是 (A,1), (B,2), (C,3), (D,4), (E,5 ), (F,6) 其中每条消息的第二个组成部分是其序列号。
source | counts
-------|-----------
(A,1) | --> emit A
(A,1) | --> skip
(C,3) | (3,(C,1))
(B,2) | (3,(C,1)) --> emit B,C and remove C
(D,4) | --> emit D
(F,6) | (6,(F,1))
(F,6) | (6,(F,2)) --> emit F and remove
我有一个问题,我不知道如何用 RX 完美地处理。 我 有多个流 据说都包含相同的元素 但是每个流可能会丢失消息(涉及UDP)或者与其他流相比late/early。这些消息中的每一条都有一个序列号。
现在我想要实现的是从所有这些流中获取一个流, 没有重复并保持消息顺序 。换句话说,同一个序列号不应该出现两次,它们的值只能增加,不能减少。 当一条消息在所有流上丢失时,我可以接受丢失它(因为涉及另一种 TCP 机制,允许我明确询问丢失的消息)。
我希望在 RxJava 中做到这一点,但我想我的问题并非特定于 Java。
这是一个弹珠图,可帮助直观显示我想要实现的目标: marble diagram
您可以在该图中看到我们正在等待第一个流上的 2 从第二个流输出 3。 同样,只有当我们从第二个流中收到 6 时,才会输出 6 因为只有在那时我们才能确定 5 永远不会被任何流接收。
不久前出现了一个类似的问题,I have a custom merge operator 当给定有序流时,它会按顺序合并它们,但不进行重复数据删除。
编辑:
如果你可以"afford"它,你可以使用这个自定义合并然后distinctUntilChanged(Func1)
过滤掉具有相同序列号的后续消息。
Observable<Message> messages = SortedMerge.create(
Arrays.asList(src1, src2, src3), (a, b) -> Long.compare(a.id, b.id))
.distinctUntilChanged(v -> v.id);
这是浏览器代码,但我认为它应该能让您很好地了解如何解决这个问题。
public static IObservable<T> Sequenced<T>(
this IObservable<T> source,
Func<T, int> getSequenceNumber,
int sequenceBegin,
int sequenceRedundancy)
{
return Observable.Create(observer =>
{
// The next sequence number in order.
var sequenceNext = sequenceBegin;
// The key is the sequence number.
// The value is (T, Count).
var counts = new SortedDictionary<int, Tuple<T, int>>();
return source.Subscribe(
value =>
{
var sequenceNumber = getSequenceNumber(value);
// If the sequence number for the current value is
// earlier in the sequence, just throw away this value.
if (sequenceNumber < sequenceNext)
{
return;
}
// Update counts based on the current value.
Tuple<T, int> count;
if (!counts.TryGetValue(sequenceNumber, out count))
{
count = Tuple.Create(value, 0);
}
count = Tuple.Create(count.Item1, count.Item2 + 1);
counts[sequenceNumber] = count;
// If the current count has reached sequenceRedundancy,
// that means any seqeunce values S such that
// sequenceNext < S < sequenceNumber and S has not been
// seen yet will never be seen. So we emit everything
// we have seen up to this point, in order.
if (count.Item2 >= sequenceRedundancy)
{
var removal = counts.Keys
.TakeWhile(seq => seq <= sequenceNumber)
.ToList();
foreach (var seq in removal)
{
count = counts[seq];
observer.OnNext(count.Item1);
counts.Remove(seq);
}
sequenceNext++;
}
// Emit stored values as long as we keep having the
// next sequence value.
while (counts.TryGetValue(sequenceNext, out count))
{
observer.OnNext(count.Item1);
counts.Remove(sequenceNext);
sequenceNext++;
}
},
observer.OnError,
() =>
{
// Emit in order any remaining values.
foreach (var count in counts.Values)
{
observer.OnNext(count.Item1);
}
observer.OnCompleted();
});
});
}
如果您有两个流 IObservable<Message> A
和 IObservable<Message> B
,您可以通过 Observable.Merge(A, B).Sequenced(msg => msg.SequenceNumber, 1, 2)
.
对于您的弹珠图示例,它看起来如下所示,其中 source
列显示 Observable.Merge(A, B)
发出的值,counts
列显示 SortedDictionary
算法的每一步之后。我假设原始源序列(没有任何丢失值)的 "messages" 是 (A,1), (B,2), (C,3), (D,4), (E,5 ), (F,6) 其中每条消息的第二个组成部分是其序列号。
source | counts
-------|-----------
(A,1) | --> emit A
(A,1) | --> skip
(C,3) | (3,(C,1))
(B,2) | (3,(C,1)) --> emit B,C and remove C
(D,4) | --> emit D
(F,6) | (6,(F,1))
(F,6) | (6,(F,2)) --> emit F and remove