如何使用 rxjs 定期检查实时连接?
How to periodically check live connection using rxjs?
我使用 rxjs 来处理 websocket 连接
var socket = Rx.Observable.webSocket('wss://echo.websocket.org')
socket.resultSelector = (e) => e.data
我想定期 (5s) 发送 ping
消息并等待 3s 接收 pong
响应,如果没有收到响应则订阅流。
我试过了,没有成功。我承认我有点迷茫,所有操作员都可以处理超时、deboune 或油门。
// periodically send a ping message
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
.mergeMap(
ping$.throttle(2000).map(() => Observable.throw('pong timeout'))
)
pong$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
但不幸的是,没有发送任何 ping。
我也尝试过使用但没有成功。
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
const heartbeat$ = ping$
.debounceTime(5000)
.mergeMap(() => Rx.Observable.timer(5000).takeUntil(pong$))
heartbeat$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
感谢任何帮助。
您可以使用 race()
运算符始终仅连接到最先发出的 Observable:
function sendMockPing() {
// random 0 - 5s delay
return Observable.of('pong').delay(Math.random() * 10000 / 2);
}
Observable.timer(0, 5000)
.map(i => 'ping')
.concatMap(val => {
return Observable.race(
Observable.of('timeout').delay(3000),
sendMockPing()
);
})
//.filter(response => response === 'timeout') // remove all successful responses
.subscribe(val => console.log(val));
观看现场演示:https://jsbin.com/lavinah/6/edit?js,console
这随机模拟采用 0 - 5s
的响应。当响应时间超过 3 秒时 Observable.of('timeout').delay(3000)
首先完成并且 timeout
字符串由 concatMap()
传递给它的观察者。
我终于找到了基于mergeMap
和takeUntil
的解决方案
我最初的错误是使用 ping$ 作为 heartBeat$ 的输入,而我应该使用 $pong
// define the pong$
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
.share()
//使用share()因为pong$被使用了两次
const heartbeat$ = pong$
.startWith('pong') // to bootstrap the stream
.debounceTime(5000) // wait for 5s after the last received pong$ value
.do(() => this.socket.next('ping')) // send a ping
.mergeMap(() => Observable.timer(3000).takeUntil(pong$))
// we merge the current stream with another one that will
// not produce value while a pong is received before the end of the
// timer
heartbeat$.subscribe(
(msg) => console.log(`handle pong timeout`),
)
低于 heartbeat$ 函数 return 一个您可以连续收听的可观察对象
1) 每5000ms
每次往返的latency value
(socket.receive - socket.send 的时间)
或
2) -1
如果往返超过阈值(例如 3000 毫秒)
即使已发出 -1
,您仍将继续收到 latency value
或 -1
,这让您可以灵活地决定要做什么 ^.^
heartbeat$(pingInterval: number, pongTimeout: number) {
let start = 0;
const timer$ = timer(0, pingInterval).pipe(share());
const unsub = timer$.subscribe(() => {
start = Date.now();
this.ws.next('ping');
});
const ping$ = this.ws$.pipe(
switchMap(ws =>
ws.pipe(
filter(m => /^ping$/.test(`${m}`)),
map(() => Date.now() - start),
),
),
share(),
);
const dead$ = timer$.pipe(
switchMap(() =>
of(-1).pipe(
delay(pongTimeout),
takeUntil(ping$),
),
),
);
return merge(ping$, dead$).pipe(finalize(() => unsub.unsubscribe()));
}
heartbeat$(5000, 3000).subscribe(
(latency) => console.log(latency) // 82 83 82 -1 101 82 -1 -1 333 ...etc
)
我使用 rxjs 来处理 websocket 连接
var socket = Rx.Observable.webSocket('wss://echo.websocket.org')
socket.resultSelector = (e) => e.data
我想定期 (5s) 发送 ping
消息并等待 3s 接收 pong
响应,如果没有收到响应则订阅流。
我试过了,没有成功。我承认我有点迷茫,所有操作员都可以处理超时、deboune 或油门。
// periodically send a ping message
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
.mergeMap(
ping$.throttle(2000).map(() => Observable.throw('pong timeout'))
)
pong$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
但不幸的是,没有发送任何 ping。
我也尝试过使用但没有成功。
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
const heartbeat$ = ping$
.debounceTime(5000)
.mergeMap(() => Rx.Observable.timer(5000).takeUntil(pong$))
heartbeat$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
感谢任何帮助。
您可以使用 race()
运算符始终仅连接到最先发出的 Observable:
function sendMockPing() {
// random 0 - 5s delay
return Observable.of('pong').delay(Math.random() * 10000 / 2);
}
Observable.timer(0, 5000)
.map(i => 'ping')
.concatMap(val => {
return Observable.race(
Observable.of('timeout').delay(3000),
sendMockPing()
);
})
//.filter(response => response === 'timeout') // remove all successful responses
.subscribe(val => console.log(val));
观看现场演示:https://jsbin.com/lavinah/6/edit?js,console
这随机模拟采用 0 - 5s
的响应。当响应时间超过 3 秒时 Observable.of('timeout').delay(3000)
首先完成并且 timeout
字符串由 concatMap()
传递给它的观察者。
我终于找到了基于mergeMap
和takeUntil
我最初的错误是使用 ping$ 作为 heartBeat$ 的输入,而我应该使用 $pong
// define the pong$
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
.share()
//使用share()因为pong$被使用了两次
const heartbeat$ = pong$
.startWith('pong') // to bootstrap the stream
.debounceTime(5000) // wait for 5s after the last received pong$ value
.do(() => this.socket.next('ping')) // send a ping
.mergeMap(() => Observable.timer(3000).takeUntil(pong$))
// we merge the current stream with another one that will
// not produce value while a pong is received before the end of the
// timer
heartbeat$.subscribe(
(msg) => console.log(`handle pong timeout`),
)
低于 heartbeat$ 函数 return 一个您可以连续收听的可观察对象
1) 每5000ms
每次往返的latency value
(socket.receive - socket.send 的时间)
或
2) -1
如果往返超过阈值(例如 3000 毫秒)
即使已发出 -1
,您仍将继续收到 latency value
或 -1
,这让您可以灵活地决定要做什么 ^.^
heartbeat$(pingInterval: number, pongTimeout: number) {
let start = 0;
const timer$ = timer(0, pingInterval).pipe(share());
const unsub = timer$.subscribe(() => {
start = Date.now();
this.ws.next('ping');
});
const ping$ = this.ws$.pipe(
switchMap(ws =>
ws.pipe(
filter(m => /^ping$/.test(`${m}`)),
map(() => Date.now() - start),
),
),
share(),
);
const dead$ = timer$.pipe(
switchMap(() =>
of(-1).pipe(
delay(pongTimeout),
takeUntil(ping$),
),
),
);
return merge(ping$, dead$).pipe(finalize(() => unsub.unsubscribe()));
}
heartbeat$(5000, 3000).subscribe(
(latency) => console.log(latency) // 82 83 82 -1 101 82 -1 -1 333 ...etc
)