如何在 rx.js 中链接 Observable
How to chain observables in rx.js
我有一个 observable,它从服务器拉取事件,过滤应用程序类型的事件,然后订阅事件并将其分派给一个或多个处理程序来处理。
然后,处理程序开始对数据库进行一些异步更新,我发现可观察对象会如此快速地启动事件,以至于更新相互影响。这是我应该预料到的。
所以我想我需要我的处理程序,每个处理程序都使用自己的可观察对象来充当队列,该队列将处理一个事件并等待确认。
所以我的问题是,如何创建一个连续接收消息并一次发送一条消息的可观察对象,等待确认后再发布下一条消息?
我认为 observables 也需要是冷的,因为我不能丢失消息。
我认为运算符 concatMap
所做的事情与您正在寻找的东西很接近。您可以在此处查看以前的答案,以说明 concatMap
的类似用例:
RxJS queueing dependent tasks
它很接近但不完全是您想要的,因为没有等待 ACK
信号来释放下一个值。相反,concatMap
使用当前 'executed' 可观察对象的完成信号来订阅下一个。如果您的可观察对象在某处包含对数据库执行更新,那么这些更新将按顺序执行。例如:
function handler (source$) {
// source$ is your source of events from which you generate the update calls
return source$.concatMap(function (event){
return updateDB(event);
})
}
function updateDB(event) {
return Rx.Observable.create(function(observer){
// do the update in the db
// you probably have a success and error handler
// you plug the observer notification into those handlers
if (success) {
// if you need to pass down some value from the update
observer.onNext(someValue);
// In any case, signal completion to allow concatMap to move to next update
observer.onCompleted();
}
if (error) {observer.onError(error);}
})
}
这是专门针对您正在使用的库的通用代码。您可以直接使用运算符 fromNodeCallback
或 fromCallback
,具体取决于您的数据库更新函数的 API。
尽管如此,请注意,在执行当前可观察对象时,可能会涉及一些缓冲以保持下一个可观察对象,并且该缓冲区只能是有限的,因此如果您在速度上确实存在显着差异在生产者和消费者之间,或者内存限制,你可能希望以不同的方式处理事情。
此外,如果您使用的是 RxJS v5,onError
变为 error
,onComplete
变为 complete
,onNext
变为 next
(比照new observer interface).
最后一条评论,您的流的 lossy/lossless 性质是一个不同于流的热与冷性质的概念。您可以查看两种类型的流的 。
我有一个 observable,它从服务器拉取事件,过滤应用程序类型的事件,然后订阅事件并将其分派给一个或多个处理程序来处理。
然后,处理程序开始对数据库进行一些异步更新,我发现可观察对象会如此快速地启动事件,以至于更新相互影响。这是我应该预料到的。
所以我想我需要我的处理程序,每个处理程序都使用自己的可观察对象来充当队列,该队列将处理一个事件并等待确认。
所以我的问题是,如何创建一个连续接收消息并一次发送一条消息的可观察对象,等待确认后再发布下一条消息?
我认为 observables 也需要是冷的,因为我不能丢失消息。
我认为运算符 concatMap
所做的事情与您正在寻找的东西很接近。您可以在此处查看以前的答案,以说明 concatMap
的类似用例:
RxJS queueing dependent tasks
它很接近但不完全是您想要的,因为没有等待 ACK
信号来释放下一个值。相反,concatMap
使用当前 'executed' 可观察对象的完成信号来订阅下一个。如果您的可观察对象在某处包含对数据库执行更新,那么这些更新将按顺序执行。例如:
function handler (source$) {
// source$ is your source of events from which you generate the update calls
return source$.concatMap(function (event){
return updateDB(event);
})
}
function updateDB(event) {
return Rx.Observable.create(function(observer){
// do the update in the db
// you probably have a success and error handler
// you plug the observer notification into those handlers
if (success) {
// if you need to pass down some value from the update
observer.onNext(someValue);
// In any case, signal completion to allow concatMap to move to next update
observer.onCompleted();
}
if (error) {observer.onError(error);}
})
}
这是专门针对您正在使用的库的通用代码。您可以直接使用运算符 fromNodeCallback
或 fromCallback
,具体取决于您的数据库更新函数的 API。
尽管如此,请注意,在执行当前可观察对象时,可能会涉及一些缓冲以保持下一个可观察对象,并且该缓冲区只能是有限的,因此如果您在速度上确实存在显着差异在生产者和消费者之间,或者内存限制,你可能希望以不同的方式处理事情。
此外,如果您使用的是 RxJS v5,onError
变为 error
,onComplete
变为 complete
,onNext
变为 next
(比照new observer interface).
最后一条评论,您的流的 lossy/lossless 性质是一个不同于流的热与冷性质的概念。您可以查看两种类型的流的