Rx 跳过,直到经过几秒
Rx skip until a number of seconds have passed
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。
例子
- observable A 每秒发出一个"tick"
- observable B 应该每 5 秒只发出这个 "tick",忽略
介于两者之间。
也就是说,跳过每个项目,直到某个时间跨度过去。
这不是去抖动,因为去抖动需要原始 Observable 停留 5 秒没有发射,但我的每秒发射一次.
请看看我的解决方案。 'tickEverySecond' 将每秒发出一个值。 'result' 将收集 'tickEverySecond' 在 time-window 5 秒内发出的所有项目。每 5 秒 window 来自 'tickEverySecond' 的最后一个发射值将被推送给订阅者。
@Test
public void name() {
TestScheduler testScheduler = new TestScheduler();
Flowable<Long> tickEverySecond = Flowable.interval(0, 1, TimeUnit.SECONDS, testScheduler);
Flowable<Long> result = tickEverySecond.window(5, TimeUnit.SECONDS, testScheduler).flatMap(longFlowable -> longFlowable.takeLast(1));
TestSubscriber<Long> test = result.test();
testScheduler.advanceTimeTo(4, TimeUnit.SECONDS);
test.assertValueCount(0).assertNotComplete();
testScheduler.advanceTimeTo(5, TimeUnit.SECONDS);
test.assertValue(4L).assertNotComplete();
testScheduler.advanceTimeTo(9, TimeUnit.SECONDS);
test.assertValues(4L).assertNotComplete();
testScheduler.advanceTimeTo(10, TimeUnit.SECONDS);
test.assertValues(4L, 9L).assertNotComplete();
testScheduler.advanceTimeTo(14_999, TimeUnit.MILLISECONDS);
test.assertValues(4L, 9L).assertNotComplete();
}
您可以使用以下运算符获得相同的效果:
Flowable<Long> result = tickEverySecond.throttleLast(5, TimeUnit.SECONDS, testScheduler);
我认为 .window() 运算符是您所需要的 -
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Window.swift
它按时间段对事件进行分区,请参阅 RxJava 文档 -
http://reactivex.io/RxJava/javadoc/rx/Observable.html#window-long-long-java.util.concurrent.TimeUnit-
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。
例子
- observable A 每秒发出一个"tick"
- observable B 应该每 5 秒只发出这个 "tick",忽略 介于两者之间。
也就是说,跳过每个项目,直到某个时间跨度过去。
这不是去抖动,因为去抖动需要原始 Observable 停留 5 秒没有发射,但我的每秒发射一次.
请看看我的解决方案。 'tickEverySecond' 将每秒发出一个值。 'result' 将收集 'tickEverySecond' 在 time-window 5 秒内发出的所有项目。每 5 秒 window 来自 'tickEverySecond' 的最后一个发射值将被推送给订阅者。
@Test
public void name() {
TestScheduler testScheduler = new TestScheduler();
Flowable<Long> tickEverySecond = Flowable.interval(0, 1, TimeUnit.SECONDS, testScheduler);
Flowable<Long> result = tickEverySecond.window(5, TimeUnit.SECONDS, testScheduler).flatMap(longFlowable -> longFlowable.takeLast(1));
TestSubscriber<Long> test = result.test();
testScheduler.advanceTimeTo(4, TimeUnit.SECONDS);
test.assertValueCount(0).assertNotComplete();
testScheduler.advanceTimeTo(5, TimeUnit.SECONDS);
test.assertValue(4L).assertNotComplete();
testScheduler.advanceTimeTo(9, TimeUnit.SECONDS);
test.assertValues(4L).assertNotComplete();
testScheduler.advanceTimeTo(10, TimeUnit.SECONDS);
test.assertValues(4L, 9L).assertNotComplete();
testScheduler.advanceTimeTo(14_999, TimeUnit.MILLISECONDS);
test.assertValues(4L, 9L).assertNotComplete();
}
您可以使用以下运算符获得相同的效果:
Flowable<Long> result = tickEverySecond.throttleLast(5, TimeUnit.SECONDS, testScheduler);
我认为 .window() 运算符是您所需要的 - https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Window.swift
它按时间段对事件进行分区,请参阅 RxJava 文档 - http://reactivex.io/RxJava/javadoc/rx/Observable.html#window-long-long-java.util.concurrent.TimeUnit-