RxJS socket.io 使用 retryWhen() 重新连接

RxJS socket.io reconnection with retryWhen()

我正在尝试在 socket.io 断开连接时重新订阅套接字事件-> 使用 RxJS 重新连接。

以下是我目前所拥有内容的精简版。我用一个 Observable 包装 socket.io,它被另一个过滤所有套接字事件的 Observable 包装。我想在套接字重新连接后使用 retryWhen,但不确定如何完成此操作...

谁能给我指出正确的方向?

function socket() {
    return Observable.create(subscriber => {
        let socket = io('my_socket_path');
        let _onevent = socket.onevent;

        socket.onevent = (packet) => {
            subscriber.next(packet);
            _onevent.call(socket, packet);
        };

        socket.on('disconnect', (data) => {
            subscriber.error(data);
        });

        return () => {
            socket.close();
        };
    }).share();
}
function getSocketEvent(event) {
    const socket = socket();
    return Observable.create(subscriber => {
        const eventSubscription = socket
            .filter(packet => packet.data[0] === event)
            .subscribe(packet => {
                subscriber.next(packet);
            });

        return () => {
            eventSubscription.unsubscribe();
        };
    }).share()
      .retryWhen(
       //Not sure how to trigger re-subscription here when socket reconnects
    ));
}
getSocketEvent('myGreatEvent').subscribe(data => {
    console.log(data);
});

谢谢

更新:

我注意到当我像这样在套接字的订阅中传递订阅者时调用了 retryWhen():

...
 const eventSubscription = socket
        .filter(packet => packet.data[0] === event)
        .subscribe(subscriber); //now triggers retryWhen()...
...

即使有了这个新发现,我也不确定为什么仅仅通过订阅者就会触发重试时间,以及当套接字重新联机时如何重新订阅。

作为新手,我发现了我的问题并回答了我自己的问题,以防它对某人有所帮助...

事实证明这很简单,我需要传递订阅中的所有处理程序。

.subscribe(
   (packet) => subscriber.next(packet),
   (error) => subscriber.error(error),
   () => console.log('completed')
);