如何使用 rxjs 定期检查实时连接?

How to periodically check live connection using rxjs?

我使用 rxjs 来处理 websocket 连接

var socket = Rx.Observable.webSocket('wss://echo.websocket.org')
socket.resultSelector = (e) => e.data

我想定期 (5s) 发送 ping 消息并等待 3s 接收 pong 响应,如果没有收到响应则订阅流。

我试过了,没有成功。我承认我有点迷茫,所有操作员都可以处理超时、deboune 或油门。

// periodically send a ping message
const ping$ = Rx.Observable.interval(2000) 
  .timeInterval()
  .do(() => socket.next('ping'))

const pong$ = socket  
  .filter(m => /^ping$/.test(`${m}`))  
  .mergeMap( 
    ping$.throttle(2000).map(() => Observable.throw('pong timeout'))
  ) 

 pong$.subscribe(
   (msg) => console.log(`end ${msg}`),
   (err) => console.log(`err ${err}`),
   () => console.log(`complete`)
 )

但不幸的是,没有发送任何 ping。

我也尝试过使用但没有成功。

const ping$ = Rx.Observable.interval(2000) 
  .timeInterval()
  .do(() => socket.next('ping'))


const pong$ = socket  
  .filter(m => /^ping$/.test(`${m}`)) 

const heartbeat$ = ping$
  .debounceTime(5000) 
  .mergeMap(() => Rx.Observable.timer(5000).takeUntil(pong$))

heartbeat$.subscribe(
  (msg) => console.log(`end ${msg}`),
  (err) => console.log(`err ${err}`),
  () => console.log(`complete`)
)

感谢任何帮助。

您可以使用 race() 运算符始终仅连接到最先发出的 Observable:

function sendMockPing() {
  // random 0 - 5s delay
  return Observable.of('pong').delay(Math.random() * 10000 / 2);
}

Observable.timer(0, 5000)
  .map(i => 'ping')
  .concatMap(val => {
    return Observable.race(
      Observable.of('timeout').delay(3000),
      sendMockPing()
    );
  })
  //.filter(response => response === 'timeout') // remove all successful responses
  .subscribe(val => console.log(val));

观看现场演示:https://jsbin.com/lavinah/6/edit?js,console

这随机模拟采用 0 - 5s 的响应。当响应时间超过 3 秒时 Observable.of('timeout').delay(3000) 首先完成并且 timeout 字符串由 concatMap() 传递给它的观察者。

我终于找到了基于mergeMaptakeUntil

的解决方案

我最初的错误是使用 ping$ 作为 heartBeat$ 的输入,而我应该使用 $pong

// define the pong$
const pong$ = socket  
  .filter(m => /^ping$/.test(`${m}`)) 
  .share() 

//使用share()因为pong$被使用了两次

const heartbeat$ = pong$
  .startWith('pong') // to bootstrap the stream
  .debounceTime(5000) // wait for 5s after the last received pong$ value
  .do(() => this.socket.next('ping')) // send a ping
  .mergeMap(() => Observable.timer(3000).takeUntil(pong$))
// we merge the current stream with another one that will 
// not produce value while a pong is received before the end of the 
// timer

heartbeat$.subscribe(
  (msg) => console.log(`handle pong timeout`),
)

低于 heartbeat$ 函数 return 一个您可以连续收听的可观察对象

1) 每5000ms

每次往返的latency value(socket.receive - socket.send 的时间)

2) -1 如果往返超过阈值(例如 3000 毫秒)

即使已发出 -1,您仍将继续收到 latency value-1,这让您可以灵活地决定要做什么 ^.^

heartbeat$(pingInterval: number, pongTimeout: number) {
  let start = 0;

  const timer$ = timer(0, pingInterval).pipe(share());

  const unsub = timer$.subscribe(() => {
    start = Date.now();
    this.ws.next('ping');
  });

  const ping$ = this.ws$.pipe(
    switchMap(ws =>
      ws.pipe(
        filter(m => /^ping$/.test(`${m}`)),
        map(() => Date.now() - start),
      ),
    ),
    share(),
  );

  const dead$ = timer$.pipe(
    switchMap(() =>
      of(-1).pipe(
        delay(pongTimeout),
        takeUntil(ping$),
      ),
    ),
  );

  return merge(ping$, dead$).pipe(finalize(() => unsub.unsubscribe()));
}

heartbeat$(5000, 3000).subscribe(
  (latency) => console.log(latency) // 82 83 82 -1 101 82 -1 -1 333 ...etc
)