如何在 rx.js 中链接 Observable

How to chain observables in rx.js

我有一个 observable,它从服务器拉取事件,过滤应用程序类型的事件,然后订阅事件并将其分派给一个或多个处理程序来处理。

然后,处理程序开始对数据库进行一些异步更新,我发现可观察对象会如此快速地启动事件,以至于更新相互影响。这是我应该预料到的。

所以我想我需要我的处理程序,每个处理程序都使用自己的可观察对象来充当队列,该队列将处理一个事件并等待确认。

所以我的问题是,如何创建一个连续接收消息并一次发送一条消息的可观察对象,等待确认后再发布下一条消息?

我认为 observables 也需要是冷的,因为我不能丢失消息。

我认为运算符 concatMap 所做的事情与您正在寻找的东西很接近。您可以在此处查看以前的答案,以说明 concatMap 的类似用例: RxJS queueing dependent tasks

它很接近但不完全是您想要的,因为没有等待 ACK 信号来释放下一个值。相反,concatMap 使用当前 'executed' 可观察对象的完成信号来订阅下一个。如果您的可观察对象在某处包含对数据库执行更新,那么这些更新将按顺序执行。例如:

function handler (source$) {
  // source$ is your source of events from which you generate the update calls
  return source$.concatMap(function (event){
    return updateDB(event);
  })
}

function updateDB(event) {
  return Rx.Observable.create(function(observer){
    // do the update in the db
    // you probably have a success and error handler 
    // you plug the observer notification into those handlers
    if (success) {
      // if you need to pass down some value from the update
      observer.onNext(someValue);
      // In any case, signal completion to allow concatMap to move to next update
      observer.onCompleted();
    }
    if (error) {observer.onError(error);}
  })
}

这是专门针对您正在使用的库的通用代码。您可以直接使用运算符 fromNodeCallbackfromCallback,具体取决于您的数据库更新函数的 API。

尽管如此,请注意,在执行当前可观察对象时,可能会涉及一些缓冲以保持下一个可观察对象,并且该缓冲区只能是有限的,因此如果您在速度上确实存在显着差异在生产者和消费者之间,或者内存限制,你可能希望以不同的方式处理事情。

此外,如果您使用的是 RxJS v5,onError 变为 erroronComplete 变为 completeonNext 变为 next(比照new observer interface).

最后一条评论,您的流的 lossy/lossless 性质是一个不同于流的热与冷性质的概念。您可以查看两种类型的流的