我如何切换()两个流,并在它们之间发出中间事件?
How can I Switch() two streams, and emit an intermediate event between them?
我有一个流中的流。当一个流结束而下一个流开始时,我需要发出一个中间事件来连接它们。这需要在前一个流的最后一个事件之后发生,并且需要改变最后一个事件的状态。基本上,输出流需要如下所示:
Stream 1 - event 1
Stream 1 - event 2
Stream 1 - event 3 (Stream 1 completes)
* My bridging event (Mutation of event 3)
Stream 2 - event 1 etc
使用最后一个元素的单个串联突变简单地转换每个内部序列。
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => o.Concat(o.LastAsync().Select(mutation))))
.Concat();
请注意,这假设每个内部流总是至少有一个项目。
听起来您可能想要 Concat 而不是 Switch。听起来好像一个流结束了,你得到了一个桥接值,然后你挂接到下一个流。这比 Switch 更 Concat。
使用Switch,您可以获得:
- 流 A 开始
- 来自 A 的事件通过您的 observable 流出
- 流 B 开始 - 您的可观察对象取消订阅 A 并切换到 B
问题是,如果在旧流完成之前出现新流,则旧流上的 LastAsync 不会触发,您也不会获得桥接事件。
如果您使用的是 Concat,则在流 A 完成之前,您的可观察对象不会切换到流 B,如果您使用 Timothy Shields 的方法,您将在 A 的末尾获得变异的最后一个值。
但是,如果您的情况是您确实希望在新流到达时立即切换,但您希望在那个时候进行桥接事件,那么这就有点棘手了。它可能必须是 Timothy Shields 答案的改编版本,例如:
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => {
// end this stream when source pushes another value
var stream = o.TakeUntil(source);
return stream.Concat(stream.LastAsync().Select(mutation)));
})
.Concat();
可能有 better/cleaner 种方法可以做到这一点。对于 Rx,总是值得用几种不同的方式尝试看似复杂的事情,因为有时一种方式比其他方式简单得多。
我有一个流中的流。当一个流结束而下一个流开始时,我需要发出一个中间事件来连接它们。这需要在前一个流的最后一个事件之后发生,并且需要改变最后一个事件的状态。基本上,输出流需要如下所示:
Stream 1 - event 1
Stream 1 - event 2
Stream 1 - event 3 (Stream 1 completes)
* My bridging event (Mutation of event 3)
Stream 2 - event 1 etc
使用最后一个元素的单个串联突变简单地转换每个内部序列。
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => o.Concat(o.LastAsync().Select(mutation))))
.Concat();
请注意,这假设每个内部流总是至少有一个项目。
听起来您可能想要 Concat 而不是 Switch。听起来好像一个流结束了,你得到了一个桥接值,然后你挂接到下一个流。这比 Switch 更 Concat。
使用Switch,您可以获得:
- 流 A 开始
- 来自 A 的事件通过您的 observable 流出
- 流 B 开始 - 您的可观察对象取消订阅 A 并切换到 B
问题是,如果在旧流完成之前出现新流,则旧流上的 LastAsync 不会触发,您也不会获得桥接事件。
如果您使用的是 Concat,则在流 A 完成之前,您的可观察对象不会切换到流 B,如果您使用 Timothy Shields 的方法,您将在 A 的末尾获得变异的最后一个值。
但是,如果您的情况是您确实希望在新流到达时立即切换,但您希望在那个时候进行桥接事件,那么这就有点棘手了。它可能必须是 Timothy Shields 答案的改编版本,例如:
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => {
// end this stream when source pushes another value
var stream = o.TakeUntil(source);
return stream.Concat(stream.LastAsync().Select(mutation)));
})
.Concat();
可能有 better/cleaner 种方法可以做到这一点。对于 Rx,总是值得用几种不同的方式尝试看似复杂的事情,因为有时一种方式比其他方式简单得多。