缓存所有最后的不同值

Cache all last distinct values

我有一个带有键的不同值流(我可以在这里按键-字符分组):

XY - 其中 X 是一个键,Y 是一些其他数据

** - 参考点(见后面)

streamInput: -A1--B1----C1--A2-**-A3-A4--B2--->

我想要的是 Flux.cache() and Flux.distinct(keySelector) 的某种组合,我们称之为 method/pattern cacheDistinct().

因此,当新流将在参考点 ** 订阅时,我将获得 ** 之前的所有不同值和所有新值。

例如:

var cachedStream = streamInput.cacheDistinct();

//will print: -A1--B1----C1--A2--A3-A4--B2--->
cacheDistinct.subscribe(i -> print(i)) 

//user/event click and we open new subscription at reference time **

//will print: -B1-C1-A2--> (all last distinct before ** and all new) ---A3-A4--B2--->
//order before ** doesn't matter
cacheDistinct.subscribe(i -> print(i))

如何实现 cacheDistinct() method/pattern?

PS。我使用 project react,但我需要一个想法,所以你可以在其他 languages/frameworks 中展示你是如何反应的。

终于,我找到了解决办法。我会把它留给下一个寻求者(代码使用 project react v3.4,但我认为其他 languages/frameworks 它应该是相似的)。

基本上,我们必须缓存分组缓存流:

    Flux<String> streamInput = Flux.just("A1", "B1", "C1", "A2", "A3", "A4", "B2")
            .delayElements(Duration.ofSeconds(1))
            .groupBy(i -> i.charAt(0))     //we group by our id
            .map(i -> i.cache(1))          //we cache last element in each group
            .cache()                       //we cache all grouped streams
            .flatMap(i -> i);              //we unpack streams to single stream

    //TEST bellow


    streamInput
            .subscribeOn(Schedulers.parallel()) //prevent blocking main thread
            .subscribe(i -> System.out.println("i1 = " + i));

    Thread.sleep(5000); //simulate user event to open stream at reference time **

    streamInput
            .subscribeOn(Schedulers.parallel())
            .subscribe(i -> System.out.println("i2 = " + i));

    Thread.sleep(100000);  //UGLY waiting for async code to be finished