关闭多播 Observable
Shutdown multicasted Observable
我创建了一个 Observable,它被 publish
ed,并被许多观察者订阅。
我想任意关闭它。我知道如果我这样做 refCount
,它会自动关闭,当所有观察者都取消订阅时,但我想手动关闭,而不存储和遍历所有 Disposables
您可以在 connect
返回的 Disposable
上调用 dispose
,但请注意,这可能会使观察者处于活动状态,因为它们不会收到任何进一步的事件。无论如何,你也必须处理掉它们。
ConnectableObservable co = source.publish();
Disposable d = co.connect();
Disposable d1 = co.subscribe();
Disposable d2 = co.subscribe();
d.dispose();
d1.dispose();
d2.dispose();
如果要避免悬挂部分,请使用 takeUntil
和主题:
PublishSubject terminate = PublishSubject.create();
ConnectableObservable co = source.publish();
Disposable d = co.connect();
terminate.doOnComplete(d::dispose).subscribe();
Observable observable = co.takeUntil(terminate);
observable .subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done 1"));
observable .subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done 2"));
terminate.onComplete();
我创建了一个 Observable,它被 publish
ed,并被许多观察者订阅。
我想任意关闭它。我知道如果我这样做 refCount
,它会自动关闭,当所有观察者都取消订阅时,但我想手动关闭,而不存储和遍历所有 Disposables
您可以在 connect
返回的 Disposable
上调用 dispose
,但请注意,这可能会使观察者处于活动状态,因为它们不会收到任何进一步的事件。无论如何,你也必须处理掉它们。
ConnectableObservable co = source.publish();
Disposable d = co.connect();
Disposable d1 = co.subscribe();
Disposable d2 = co.subscribe();
d.dispose();
d1.dispose();
d2.dispose();
如果要避免悬挂部分,请使用 takeUntil
和主题:
PublishSubject terminate = PublishSubject.create();
ConnectableObservable co = source.publish();
Disposable d = co.connect();
terminate.doOnComplete(d::dispose).subscribe();
Observable observable = co.takeUntil(terminate);
observable .subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done 1"));
observable .subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done 2"));
terminate.onComplete();