RxJava - ConnectableObservable,断开和重新连接

RxJava - ConnectableObservable, disconnecting and reconnecting

我正在尝试从“断开连接”部分复制示例代码 here

Disconnecting

As we saw in connect's signature, this method returns a Subscription, just like Observable.subscribe does. You can use that reference to terminate the ConnectableObservable's subscription. That will stop events from being propagated to observers but it will not unsubscribe them from the ConnectableObservable. If you call connect again, the ConnectableObservable will start a new subscription and the old observers will begin receiving values again.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

connectable.subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

输出

0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...

使用 RxJava 2.0.8,我有:

    ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
    Disposable s = connectable.connect();

    connectable.subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onNext(Long aLong) {
            Log.d("test", "Num: " + aLong);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Closing connection");
    s.dispose();

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Reconnecting...");
    connectable.connect();

输出

Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...

提前致谢....

RxJava 似乎没有采用这种行为。工作示例来自 Rx.NET。参见 https://github.com/ReactiveX/RxJava/issues/4771