RxJava Subject 不使用 .repeat() 重复?

RxJava Subject not repeating with .repeat()?

任何人都可以向我解释这种行为吗?

我刚开始学习 RxJava,我想将我自己的事件发布到 Observable 链上。但是我在理解主题行为时遇到了一些问题。

此代码:

PublishSubject<String> subject = PublishSubject.create();
subject.repeat(3)
       .subscribe(s -> System.out.println("subject emitted %s", s));

subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onNext("four");
subject.onCompleted();

Observable.just("one", "two", "three", "four")
          .repeat(3)
          .subscribe(s -> System.out.println("observer emitted %s", s));

输出这个:

subject emitted one
subject emitted two
subject emitted three
subject emitted four

observer emitted one
observer emitted two
observer emitted three
observer emitted four
observer emitted one
observer emitted two
observer emitted three
observer emitted four
observer emitted one
observer emitted two
observer emitted three
observer emitted four

为什么在主题上忽略 .repeat(3)?

谢谢!

在第一个 subscribe 之后,repeat 将不会订阅 PublicSubject,直到它收到 onCompleted。您可以使用 doOnSubscribedoOnCompleted 来调试它。比如

    PublishSubject<String> subject = PublishSubject.create();
    subject.doOnSubscribe(() -> System.out.println("subject subscribe"))
            .doOnCompleted(() -> System.out.println("subject onCompleted"))
            .repeat(3)
            .subscribe(s -> System.out.printf("subject emitted %s\n", s));

    subject.onNext("one");
    subject.onNext("two");
    subject.onNext("three");
    subject.onNext("four");
    subject.onCompleted();

执行顺序如下:

subscribe to PublicSubject
subject.onNext("one") => output "one"
subject.onNext("two") => output "two"
subject.onNext("three") => output "three"
subject.onNext("four") => output "four"
subject.onCompleted() => 
            output "onCompleted"
            subscribe to PublicSubject // a new Observer is added to PublicSubject and PublicSubject is still emitting `onCompleted`, 
                                       // so this new Observer will receive `onCompleted` at once
            output "onCompleted"
            subscribe to PublicSubject // a new Observer is added to PublicSubject and PublicSubject is still emitting `onCompleted`, 
                                       // so this new Observer will receive `onCompleted` at once
            output "onCompleted"

实际上,如果你想输出所有项目3次,你可以使用ReplaySubject

也许你可以这样使用.asObservable()

subject.asObservable().repeat(3)
       .subscribe(s -> System.out.println("subject emitted %s", s));