PublishSubject 为每个当前观察者调用映射

PublishSubject calls map for each present observer

我正在使用 PublishSubjectmap 运算符:

@Test
public void testMapWithMultipleObservers() {

    PublishSubject<Integer> subject = PublishSubject.create();
    Func1 action = spy(new Func1<Integer, Integer>() {
        @Override
        public Integer call(Integer integer) {
            return integer;
        }
    });
    Observable<Integer> observable = subject.asObservable().map(action);

    observable.subscribe(mock(Observer.class));
    observable.subscribe(mock(Observer.class));

    subject.onNext(1);

    verify(action, times(2)).call(anyInt());
    // however, I need it to be times(1)

}

期望的行为是在主体产生一个值后执行一个动作。我试过 doOnEachdoOnNextmap 并且在每种情况下都会为每个在场的观察者执行操作(对于 100 个观察者,操作将执行 100 次),而我需要它每次发射执行。

你有什么建议吗?

谢谢。

最快的选择是使用 share()

Observable<Integer> observable = 
    subject
        .map(action)
        .share();

您不需要 asObservable() 调用。它用于 return 来自 API 的主题,并防止调用者将其转换回主题。例如:

Observable<Integer> getSubjectAsObservable() {
    return subject.asObservable();
}