缓存所有最后的不同值
Cache all last distinct values
我有一个带有键的不同值流(我可以在这里按键-字符分组):
XY - 其中 X 是一个键,Y 是一些其他数据
** - 参考点(见后面)
streamInput: -A1--B1----C1--A2-**-A3-A4--B2--->
我想要的是 Flux.cache() and Flux.distinct(keySelector) 的某种组合,我们称之为 method/pattern cacheDistinct().
因此,当新流将在参考点 **
订阅时,我将获得 **
之前的所有不同值和所有新值。
例如:
var cachedStream = streamInput.cacheDistinct();
//will print: -A1--B1----C1--A2--A3-A4--B2--->
cacheDistinct.subscribe(i -> print(i))
//user/event click and we open new subscription at reference time **
//will print: -B1-C1-A2--> (all last distinct before ** and all new) ---A3-A4--B2--->
//order before ** doesn't matter
cacheDistinct.subscribe(i -> print(i))
如何实现 cacheDistinct() method/pattern?
PS。我使用 project react,但我需要一个想法,所以你可以在其他 languages/frameworks 中展示你是如何反应的。
终于,我找到了解决办法。我会把它留给下一个寻求者(代码使用 project react v3.4,但我认为其他 languages/frameworks 它应该是相似的)。
基本上,我们必须缓存分组缓存流:
Flux<String> streamInput = Flux.just("A1", "B1", "C1", "A2", "A3", "A4", "B2")
.delayElements(Duration.ofSeconds(1))
.groupBy(i -> i.charAt(0)) //we group by our id
.map(i -> i.cache(1)) //we cache last element in each group
.cache() //we cache all grouped streams
.flatMap(i -> i); //we unpack streams to single stream
//TEST bellow
streamInput
.subscribeOn(Schedulers.parallel()) //prevent blocking main thread
.subscribe(i -> System.out.println("i1 = " + i));
Thread.sleep(5000); //simulate user event to open stream at reference time **
streamInput
.subscribeOn(Schedulers.parallel())
.subscribe(i -> System.out.println("i2 = " + i));
Thread.sleep(100000); //UGLY waiting for async code to be finished
我有一个带有键的不同值流(我可以在这里按键-字符分组):
XY - 其中 X 是一个键,Y 是一些其他数据
** - 参考点(见后面)
streamInput: -A1--B1----C1--A2-**-A3-A4--B2--->
我想要的是 Flux.cache() and Flux.distinct(keySelector) 的某种组合,我们称之为 method/pattern cacheDistinct().
因此,当新流将在参考点 **
订阅时,我将获得 **
之前的所有不同值和所有新值。
例如:
var cachedStream = streamInput.cacheDistinct();
//will print: -A1--B1----C1--A2--A3-A4--B2--->
cacheDistinct.subscribe(i -> print(i))
//user/event click and we open new subscription at reference time **
//will print: -B1-C1-A2--> (all last distinct before ** and all new) ---A3-A4--B2--->
//order before ** doesn't matter
cacheDistinct.subscribe(i -> print(i))
如何实现 cacheDistinct() method/pattern?
PS。我使用 project react,但我需要一个想法,所以你可以在其他 languages/frameworks 中展示你是如何反应的。
终于,我找到了解决办法。我会把它留给下一个寻求者(代码使用 project react v3.4,但我认为其他 languages/frameworks 它应该是相似的)。
基本上,我们必须缓存分组缓存流:
Flux<String> streamInput = Flux.just("A1", "B1", "C1", "A2", "A3", "A4", "B2")
.delayElements(Duration.ofSeconds(1))
.groupBy(i -> i.charAt(0)) //we group by our id
.map(i -> i.cache(1)) //we cache last element in each group
.cache() //we cache all grouped streams
.flatMap(i -> i); //we unpack streams to single stream
//TEST bellow
streamInput
.subscribeOn(Schedulers.parallel()) //prevent blocking main thread
.subscribe(i -> System.out.println("i1 = " + i));
Thread.sleep(5000); //simulate user event to open stream at reference time **
streamInput
.subscribeOn(Schedulers.parallel())
.subscribe(i -> System.out.println("i2 = " + i));
Thread.sleep(100000); //UGLY waiting for async code to be finished