如何在可观察对象之间创建依赖关系?
How to create a dependency between observables?
我想要一个用于测试 Rx 组件的工具,它可以像这样工作:
给定指定为 'v seq
的事件顺序和关键选择器函数 (keySelector :: 'v -> 'k
) 我想创建一个 Map<'k, IObservable<'k>>
保证分组的可观察对象产生全局值由上述可枚举定义的顺序。
例如:
makeObservables isEven [1;2;3;4;5;6]
...应该产生
{ true : -2-4-6|,
false: 1-3-5| }
我的尝试是这样的:
open System
open System.Reactive.Linq
open FSharp.Control.Reactive
let subscribeAfter (o1: IObservable<'a>) (o2 : IObservable<'b>) : IObservable<'b> =
fun (observer : IObserver<'b>) ->
let tempObserver = { new IObserver<'a> with
member this.OnNext x = ()
member this.OnError e = observer.OnError e
member this.OnCompleted () = o2 |> Observable.subscribeObserver observer |> ignore
}
o1.Subscribe tempObserver
|> Observable.Create
let makeObservables (keySelector : 'a -> 'k) (xs : 'a seq) : Map<'k, IObservable<'a>> =
let makeDependencies : ('k * IObservable<'a>) seq -> ('k * IObservable<'a>) seq =
let makeDep ((_, o1), (k2, o2)) = (k2, subscribeAfter o1 o2)
Seq.pairwise
>> Seq.map makeDep
let makeObservable x = (keySelector x, Observable.single x)
let firstItem =
Seq.head xs
|> makeObservable
|> Seq.singleton
let dependentObservables =
xs
|> Seq.map makeObservable
|> makeDependencies
dependentObservables
|> Seq.append firstItem
|> Seq.groupBy fst
|> Seq.map (fun (k, obs) -> (k, obs |> Seq.map snd |> Observable.concatSeq))
|> Map.ofSeq
[<EntryPoint>]
let main argv =
let isEven x = (x % 2 = 0)
let splits : Map<bool, IObservable<int>> =
[1;2;3;4;5]
|> makeObservables isEven
use subscription =
splits
|> Map.toSeq
|> Seq.map snd
|> Observable.mergeSeq
|> Observable.subscribe (printfn "%A")
Console.ReadKey() |> ignore
0 // return an integer exit code
...但是结果不尽如人意,观测值不符合全局顺序。
显然每个组中的项目都正确生成但是当组合并时它更像是一个 concat 然后是一个合并
预期输出为:1 2 3 4 5
...但实际输出是 1 3 5 2 4
我做错了什么?
谢谢!
您描述想要这个:
{ true : -2-4-6|,
false: 1-3-5| }
但你真的在创造这个:
{ true : 246|,
false: 135| }
由于可观察对象中的项目之间没有时间间隔,merge
基本上具有恒定的竞争条件。 Rx 保证给定序列的元素 1 将在元素 2 之前触发,但是 Merge
不对这种情况提供任何保证。
如果您希望 Merge
能够按原始顺序重新排序,则需要在您的可观察对象中引入时间间隔。
我想要一个用于测试 Rx 组件的工具,它可以像这样工作:
给定指定为 'v seq
的事件顺序和关键选择器函数 (keySelector :: 'v -> 'k
) 我想创建一个 Map<'k, IObservable<'k>>
保证分组的可观察对象产生全局值由上述可枚举定义的顺序。
例如:
makeObservables isEven [1;2;3;4;5;6]
...应该产生
{ true : -2-4-6|,
false: 1-3-5| }
我的尝试是这样的:
open System
open System.Reactive.Linq
open FSharp.Control.Reactive
let subscribeAfter (o1: IObservable<'a>) (o2 : IObservable<'b>) : IObservable<'b> =
fun (observer : IObserver<'b>) ->
let tempObserver = { new IObserver<'a> with
member this.OnNext x = ()
member this.OnError e = observer.OnError e
member this.OnCompleted () = o2 |> Observable.subscribeObserver observer |> ignore
}
o1.Subscribe tempObserver
|> Observable.Create
let makeObservables (keySelector : 'a -> 'k) (xs : 'a seq) : Map<'k, IObservable<'a>> =
let makeDependencies : ('k * IObservable<'a>) seq -> ('k * IObservable<'a>) seq =
let makeDep ((_, o1), (k2, o2)) = (k2, subscribeAfter o1 o2)
Seq.pairwise
>> Seq.map makeDep
let makeObservable x = (keySelector x, Observable.single x)
let firstItem =
Seq.head xs
|> makeObservable
|> Seq.singleton
let dependentObservables =
xs
|> Seq.map makeObservable
|> makeDependencies
dependentObservables
|> Seq.append firstItem
|> Seq.groupBy fst
|> Seq.map (fun (k, obs) -> (k, obs |> Seq.map snd |> Observable.concatSeq))
|> Map.ofSeq
[<EntryPoint>]
let main argv =
let isEven x = (x % 2 = 0)
let splits : Map<bool, IObservable<int>> =
[1;2;3;4;5]
|> makeObservables isEven
use subscription =
splits
|> Map.toSeq
|> Seq.map snd
|> Observable.mergeSeq
|> Observable.subscribe (printfn "%A")
Console.ReadKey() |> ignore
0 // return an integer exit code
...但是结果不尽如人意,观测值不符合全局顺序。
显然每个组中的项目都正确生成但是当组合并时它更像是一个 concat 然后是一个合并
预期输出为:1 2 3 4 5
...但实际输出是 1 3 5 2 4
我做错了什么?
谢谢!
您描述想要这个:
{ true : -2-4-6|,
false: 1-3-5| }
但你真的在创造这个:
{ true : 246|,
false: 135| }
由于可观察对象中的项目之间没有时间间隔,merge
基本上具有恒定的竞争条件。 Rx 保证给定序列的元素 1 将在元素 2 之前触发,但是 Merge
不对这种情况提供任何保证。
如果您希望 Merge
能够按原始顺序重新排序,则需要在您的可观察对象中引入时间间隔。