取第一项但不完成

Take the first item and do not complete

有没有办法(操作员)shrink/enhance 下面的代码?

我想获取 Observable 的第一个 元素,但是延迟它的完成 直到源完成(这就是为什么我与 ignoreElements).

连接
public class SOTest {

    private final TestScheduler scheduler = new TestScheduler();

    @Test
    public void take_first_and_do_not_complete() {
        TestObserver<Long> test = Observable.interval(1, TimeUnit.SECONDS, scheduler)
                .take(7)
                .publish(o -> o
                        .firstElement()
                        .toObservable()
                        .concatWith(o
                                .doOnNext(e -> System.out.println("to be ignored: " + e))
                                .ignoreElements()
                        )
                )
                .doOnNext(e -> System.out.println("First is: " + e))
                .test();

        scheduler.advanceTimeTo(1, TimeUnit.SECONDS);

        test.assertValueCount(1);
        test.assertNotComplete();

        scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
        test.assertValueCount(1);
        test.assertNotComplete();

        scheduler.advanceTimeTo(7, TimeUnit.SECONDS);
        test.assertValueCount(1);
        test.assertComplete();
    }
}

怎么样.distinctUntilChanged()

Observable.interval(1, TimeUnit.SECONDS, scheduler)
    .take(7)
    .distinctUntilChanged((a, b) -> true)
    .test();

除了第一个项目之外的所有项目都被比较器阻止。上游完成时流完成。


编辑:

甚至更短 .distinct

Observable.interval(1, TimeUnit.SECONDS, scheduler)
    .take(7)
    .distinct(a -> 0)
    .test();