如何在 RxJava2 中的自定义 Observable 中获得观察者处置操作的通知
How to get notified of a observer's dispose action in a custom Observable in RxJava2
In this thread,提出了一个问题,如何观察取消订阅事件,以便在取消订阅后清理并移除监听器。但是,在RxJava2中,上述线程的方法不再起作用了。
def myObservable = Observable.create({ aEmitter ->
val listener = {event ->
aEmitter.onNext(event);
}
existingEventSource.addListener(listener)
// Fails since aEmitter doesn't have an add() method nor does Subscriptions exist.
aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})
在 RxJava2 中解决这个问题的正确方法是什么?
请查看 stringObservable Observable,如何处理订阅。
public class MyTest {
@Mock private MyService mock;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void nam3e() {
ArrayList<Listener> listeners = new ArrayList<>();
doAnswer(
invocation -> {
Object[] args = invocation.getArguments();
Listener arg = (Listener) args[0];
listeners.add(arg);
return null;
})
.when(mock)
.addListener(any());
Observable<String> stringObservable =
Observable.create(
e -> {
Listener listener =
s -> {
e.onNext(s);
};
mock.addListener(listener);
e.setCancellable(
() -> {
mock.removeListener(listener);
});
});
TestObserver<String> test = stringObservable.test();
Listener listener = listeners.get(0);
listener.onNext("Wurst");
test.assertNotComplete().assertValue("Wurst");
verify(mock, times(1)).addListener(any());
test.dispose();
verify(mock, times(1)).removeListener(any());
}
public interface MyService {
void addListener(Listener listener);
void removeListener(Listener listener);
}
@FunctionalInterface
public interface Listener {
void onNext(String s);
}
}
In this thread,提出了一个问题,如何观察取消订阅事件,以便在取消订阅后清理并移除监听器。但是,在RxJava2中,上述线程的方法不再起作用了。
def myObservable = Observable.create({ aEmitter ->
val listener = {event ->
aEmitter.onNext(event);
}
existingEventSource.addListener(listener)
// Fails since aEmitter doesn't have an add() method nor does Subscriptions exist.
aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})
在 RxJava2 中解决这个问题的正确方法是什么?
请查看 stringObservable Observable,如何处理订阅。
public class MyTest {
@Mock private MyService mock;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void nam3e() {
ArrayList<Listener> listeners = new ArrayList<>();
doAnswer(
invocation -> {
Object[] args = invocation.getArguments();
Listener arg = (Listener) args[0];
listeners.add(arg);
return null;
})
.when(mock)
.addListener(any());
Observable<String> stringObservable =
Observable.create(
e -> {
Listener listener =
s -> {
e.onNext(s);
};
mock.addListener(listener);
e.setCancellable(
() -> {
mock.removeListener(listener);
});
});
TestObserver<String> test = stringObservable.test();
Listener listener = listeners.get(0);
listener.onNext("Wurst");
test.assertNotComplete().assertValue("Wurst");
verify(mock, times(1)).addListener(any());
test.dispose();
verify(mock, times(1)).removeListener(any());
}
public interface MyService {
void addListener(Listener listener);
void removeListener(Listener listener);
}
@FunctionalInterface
public interface Listener {
void onNext(String s);
}
}