Rx 跳过,直到经过几秒

Rx skip until a number of seconds have passed

我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。

例子

也就是说,跳过每个项目,直到某个时间跨度过去。

不是去抖动,因为去抖动需要原始 Observable 停留 5 秒没有发射,但我的每秒发射一次.

请看看我的解决方案。 'tickEverySecond' 将每秒发出一个值。 'result' 将收集 'tickEverySecond' 在 time-window 5 秒内发出的所有项目。每 5 秒 window 来自 'tickEverySecond' 的最后一个发射值将被推送给订阅者。

@Test
  public void name() {
    TestScheduler testScheduler = new TestScheduler();
    Flowable<Long> tickEverySecond = Flowable.interval(0, 1, TimeUnit.SECONDS, testScheduler);

    Flowable<Long> result = tickEverySecond.window(5, TimeUnit.SECONDS, testScheduler).flatMap(longFlowable -> longFlowable.takeLast(1));

    TestSubscriber<Long> test = result.test();

    testScheduler.advanceTimeTo(4, TimeUnit.SECONDS);
    test.assertValueCount(0).assertNotComplete();

    testScheduler.advanceTimeTo(5, TimeUnit.SECONDS);
    test.assertValue(4L).assertNotComplete();

    testScheduler.advanceTimeTo(9, TimeUnit.SECONDS);
    test.assertValues(4L).assertNotComplete();

    testScheduler.advanceTimeTo(10, TimeUnit.SECONDS);
    test.assertValues(4L, 9L).assertNotComplete();

    testScheduler.advanceTimeTo(14_999, TimeUnit.MILLISECONDS);
    test.assertValues(4L, 9L).assertNotComplete();
  }

您可以使用以下运算符获得相同的效果:

Flowable<Long> result = tickEverySecond.throttleLast(5, TimeUnit.SECONDS, testScheduler);

我认为 .window() 运算符是您所需要的 - https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Window.swift

它按时间段对事件进行分区,请参阅 RxJava 文档 - http://reactivex.io/RxJava/javadoc/rx/Observable.html#window-long-long-java.util.concurrent.TimeUnit-