对 rxjs 取消订阅做出反应

React to rxjs unsubscribe

我正在尝试使用 RXJS Observables 包装一个基于事件的 API,但我不知道如何在取消订阅发生后自行清理。

这几乎就是我将订阅与 Observable 和基于事件的同步的方式 API:

interface SomeEventService{
    registerListener(messageName: string, messageContent: Something): Listener;
    unregisterListener(listener: Listener);
}

class MyWrapper {
    private eventService: SomeEventService;

    streamOfSomething$(): Observable<Something> {
        return Observable.create((observer: Observer<Something>) => {
            // Create a listener, and relay events to the Observable
            const listener = this.eventService.registerListener("something", something => {
                observer.next(something);
            })
            // Now what?
        })
    }
}

这里的问题是,在未来的某个时间我想调用 eventService.unregisterListener,例如当 streamOfSomething$ 的消费者取消订阅流时。

问题:我该怎么做?据我所知,在取消订阅发生后,我无法在 observer 到 运行 代码上使用任何事件或回调。

class MyWrapper {
    private eventService: SomeEventService;

    streamOfSomething$(): Observable<Something> {
        return Observable.create((observer: Observer<Something>) => {
            // Create a listener, and relay events to the Observable
            const listener = this.eventService.registerListener("something", something => {
                observer.next(something);
            })
            // Now what?
             return () => { this.eventService.unRegisterListener(); };
        })
    }
}

当用户调用 unsubscribe() 时,您将 return 调用的函数