rxjs websocket - 互联网连接丢失
rxjs websocket - internet connection loss
如果客户端与 rxjs websocket 失去连接,我找不到管道重试的方法 class。
this.cexSocket = webSocket({ url: 'wss://ws.cex.io/ws/', WebSocketCtor: websocketModule.default });
this.socket = this.cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e));
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
如果客户端没有收到来自服务器的报价,它应该关闭连接并再次进行身份验证。检查报价函数:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
//??? this.socket.error() does not work as expected
}
如果找不到更多滴答声,我可以通过强制关闭套接字来获得解决方案:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
this.socket.unsubscribe();
/* eslint no-underscore-dangle: 0 */
if (!this.cexSocket._socket)
this.cexSocket._connectSocket();
this.cexSocket._socket.close();
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
}
我还必须退订才能再次获得 运行 的身份验证。这是我的添加事件功能,以防有人对流程感到疑惑:
static addEvents (eventGroup) {
({
disconnecting: grp => grp.subscribe(msg => console.warn(msg)),
connected: grp => grp.subscribe(() => {
this.authenticate();
}),
ping: grp => grp.subscribe(() => this.send({ e: 'pong' })),
auth: () => this.registerTicker(),
tick: grp => {
this.ticker = grp
.pipe(
map(
({
data: {
symbol1: base,
symbol2: quote,
price
}
}) => ({
base,
quote,
price
}))
)
.pipe(share());
grp.subscribe(() => {
this.lastTick = Date.now();
});
}
})[eventGroup.key](eventGroup);
}
更新:
我重写了整件事。现在我只是尝试发送到服务器,如果连接丢失,retryWhen 将捕获它:
export const getTicker = () => {
const ticker = cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e))
.pipe(flatMap(grp => {
const o = addEvents(grp);
if (!o)
return Observable.create();
return o;
}));
//This is to make sure the server is still there
const check = interval(15000).pipe(switchMap(() => {
cexSocket.next(JSON.stringify({ e: 'ping' }));
return ticker;
}));
return check;
};
如果客户端与 rxjs websocket 失去连接,我找不到管道重试的方法 class。
this.cexSocket = webSocket({ url: 'wss://ws.cex.io/ws/', WebSocketCtor: websocketModule.default });
this.socket = this.cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e));
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
如果客户端没有收到来自服务器的报价,它应该关闭连接并再次进行身份验证。检查报价函数:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
//??? this.socket.error() does not work as expected
}
如果找不到更多滴答声,我可以通过强制关闭套接字来获得解决方案:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
this.socket.unsubscribe();
/* eslint no-underscore-dangle: 0 */
if (!this.cexSocket._socket)
this.cexSocket._connectSocket();
this.cexSocket._socket.close();
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
}
我还必须退订才能再次获得 运行 的身份验证。这是我的添加事件功能,以防有人对流程感到疑惑:
static addEvents (eventGroup) {
({
disconnecting: grp => grp.subscribe(msg => console.warn(msg)),
connected: grp => grp.subscribe(() => {
this.authenticate();
}),
ping: grp => grp.subscribe(() => this.send({ e: 'pong' })),
auth: () => this.registerTicker(),
tick: grp => {
this.ticker = grp
.pipe(
map(
({
data: {
symbol1: base,
symbol2: quote,
price
}
}) => ({
base,
quote,
price
}))
)
.pipe(share());
grp.subscribe(() => {
this.lastTick = Date.now();
});
}
})[eventGroup.key](eventGroup);
}
更新:
我重写了整件事。现在我只是尝试发送到服务器,如果连接丢失,retryWhen 将捕获它:
export const getTicker = () => {
const ticker = cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e))
.pipe(flatMap(grp => {
const o = addEvents(grp);
if (!o)
return Observable.create();
return o;
}));
//This is to make sure the server is still there
const check = interval(15000).pipe(switchMap(() => {
cexSocket.next(JSON.stringify({ e: 'ping' }));
return ticker;
}));
return check;
};