RxJava - ConnectableObservable,断开和重新连接
RxJava - ConnectableObservable, disconnecting and reconnecting
我正在尝试从“断开连接”部分复制示例代码 here。
Disconnecting
As we saw in connect's signature, this method returns a Subscription, just like Observable.subscribe does. You can use that reference to terminate the ConnectableObservable's subscription. That will stop events from being propagated to observers but it will not unsubscribe them from the ConnectableObservable. If you call connect again, the ConnectableObservable will start a new subscription and the old observers will begin receiving values again.
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
输出
0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...
使用 RxJava 2.0.8,我有:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Disposable s = connectable.connect();
connectable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("test", "Num: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Closing connection");
s.dispose();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Reconnecting...");
connectable.connect();
输出
Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...
提前致谢....
RxJava 似乎没有采用这种行为。工作示例来自 Rx.NET。参见 https://github.com/ReactiveX/RxJava/issues/4771
我正在尝试从“断开连接”部分复制示例代码 here。
Disconnecting
As we saw in connect's signature, this method returns a Subscription, just like Observable.subscribe does. You can use that reference to terminate the ConnectableObservable's subscription. That will stop events from being propagated to observers but it will not unsubscribe them from the ConnectableObservable. If you call connect again, the ConnectableObservable will start a new subscription and the old observers will begin receiving values again.
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
输出
0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...
使用 RxJava 2.0.8,我有:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Disposable s = connectable.connect();
connectable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("test", "Num: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Closing connection");
s.dispose();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Reconnecting...");
connectable.connect();
输出
Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...
提前致谢....
RxJava 似乎没有采用这种行为。工作示例来自 Rx.NET。参见 https://github.com/ReactiveX/RxJava/issues/4771