RxJs:等待来自 Observable 的值,如果它在指定的时间段后没有出现,则在 n 次尝试 throwError 之后做一些事情并再次等待

RxJs: Wait on value from Observable, if it doesn't come after a specified time period do some stuff and wait again, after n tries throwError

那么我的情况是: 我正在向网络发送信令消息并希望等待响应,如果 2 秒内没有响应,我想再次发送信令消息。重试 3 次后,我想报错并在某处登录。 我正在订阅下面的简化版:

from(messages).pipe(filter(value => condition(value)))

我需要等待,直到它发出预期值并同时执行上述过程。我将不胜感激!

例如:

signal().pipe(
  timeout(2000),
  retry(3)
)
.subscribe({
  next: () => {},
  error: () => {}
})

这是一个 interactive example尝试在 switchMap 回调中使用计时)

注意:如果您想对源流进行其他操作——您可能 share()