可观察订阅不执行

Observable Subscribe Doesnt Execute

我试图一次显示列表中的消息 1 秒。我不能使用 interval(1000),因为我需要能够在满足条件后立即清除消息并显示下一条消息 1 秒。

到目前为止我有

nextMessage() : Observable<any>{
    this.currentMessage.next(this.messages[this.messagePointer++]);
    return this.currentMessages;
}

subscribeMessage() : void{
    this.currentMessage.pipe(switchMap(_=timer(1000).pipe(map(x=>this.nextMessage())))).subscribe(x=>console.log(x));
}

这目前没有产生任何结果。我的想法是 nextMessage 例程每隔 1 秒就会更新 currentMessage Observable,如果它在两者之间由外部源更新,则 Timer 将被取消并重新启动。 subscribe 方法目前没有任何输出,但是如果我尝试这个

this.currentMessage.pipe(tap(x=>console.log(x)),switchMap(_=timer(1000).pipe(map(x=>this.nextMessage())))).subscribe(x=>console.log(x));

我每隔 1 秒查看一次数据输出值。为什么点击有效但订阅无效。

编辑

这是一个link到一个StackBlitz Demo

问题:switchMap 在内部 observable 发出之前杀死 observable。

流程如下:

  1. currentMessage 发出消息
  2. tap 记录该消息
  3. switchMap 启动计时器(1000)
  4. 当计时器触发时,您调用 nextMessage()
  5. nextMessage 然后将一个值推送到 currentMessage
  6. 在你移动到下一行 nextMessage 之前,currentMesage 发出一条消息(与步骤 1 相同)
  7. tap 记录该消息(与步骤 2 相同)
  8. switchMap 杀死之前的 observable,并启动一个新的计时器(1000)
  9. 我们回到 nextMessage(),它 returns 是一个 observable 但它被忽略了,因为它在上一步中被杀死了
  10. 循环回到第 4 步

一个简单的解决方法是将 switchMap 更改为 mergeMap,但不要忘记在内部可观察对象上添加 take(1),因为您只想拥有该可观察值只有一个值:

currentMessage
  .pipe(
    tap(x => console.log('Tap:' + x)),
    mergeMap(_ => timer(1000).pipe(
      map(x => nextMessage()),
      take(1)
    ))
  )
  .subscribe(x => console.log(x));

然而,这很难理解发生了什么......如果你转向更具反应性的方法,我认为它会变得更简单。

你说你不能使用 interval(1000) 因为你需要根据某些条件重新设置。你为什么不把那个条件做成一个主题,这样你就可以把它写下来?检查这个:

const messageResets = new Subject();

const messages: string[] = ['Message 1', 'Message 2', 'Message 3'];
let messagePointer: number = 0;

console.clear();

messageResets.pipe(
  startWith(null),
  // Every time we need to clear the current message, we just switchMap to a new interval
  switchMap(() => timer(0, 1000).pipe(
    map(() => messages[messagePointer++ % messages.length])
  ))
).subscribe(x => console.log(x));

注意:timer(0, 1000)interval(1000) 相同,但它只是在开始时发出一个值,而 interval 则不会。

通常我只创建主题来表达流外部的东西:在这种情况下,我希望能够根据我不知道的条件从外部清除当前消息。

但是处理对象以保持某种内部状态从长远来看总是很痛苦的 运行。