RxJS 在处理下一个缓冲块之前等待 ajax 响应
RxJS await ajax respose before processing next buffered chunk
我将 RxJS 与一个基本的 Subject 对象一起使用,它在未知的时间从不同的地方接收输入。这些输入需要堆叠起来并异步发送到服务器 - 但我需要等到 ajax 请求完成,然后再尝试处理下一组缓冲输入。
flatMapLatest 不适合我的情况,因为每个响应都必须单独处理。
我可以在处理 ajax 请求时暂停缓冲区,但这意味着我会丢失输入,这是不可接受的。
我看到缓冲区打开对象只是一个可观察对象,我能否以某种方式将定时器可观察对象和自定义可观察对象结合起来,当我的 ajax 请求完成时和 ajax 请求完成时触发已发送 我可以暂停定时器一吗?
抱歉,我还没有任何代码可以展示,我真的还在设计阶段。
我想做的事情可行还是我做的不对?
编辑
这是我目前的代码,它似乎可以满足我的要求 - 但我仍然愿意改进。
var bufferedIntervalPauser = new Rx.Subject<boolean>();
var pausableInterval = Rx.Observable.interval(500).pausable(bufferedIntervalPauser);
var purStream = this.updateReqestSubject.asObservable()
.buffer(pausableInterval)
.where(b => (b || []).length > 0)
var purSub = purStream.subscribe(
next => {
bufferedIntervalPauser.onNext(false); // pause the buffer window
SomeAjaxMethod(next, {
success: res => {
this.HandleResult(res);
},
always: () => {
// when the ajax completes, resume the buffer
bufferedIntervalPauser.onNext(true);
}
});
},
err => {
console.error(err);
});
// start
bufferedIntervalPauser.onNext(true);
您应该可以只使用 buffer
运算符。正如@Enigmativity 指出的那样,Rx 有一个序列化协议,因此您的处理程序永远不会 运行 同时发生,即它不会在上一次调用仍在 运行ning 时被调用。但是,如果您的处理程序本身调用了一个 non-blocking/asnyc 方法,那么在创建一些异步响应之前,没有任何东西会阻止 Rx 调用您的处理程序。在这种情况下,您可能希望将异步调用包装在一个可观察的序列中。然后你可以使用像concat
这样的运算符来确保多个可观察序列的序列化
编辑:
您的问题是您在订阅中进行异步工作。您需要做的是将您的 SomeAjaxMethod
视为一个可观察的序列,然后只使用组合。
//Psuedo code as I dont have a Java IDE at hand
this.updateReqestSubject.asObservable()
.Buffer(500ms)
.Select(buffer=>SomeAjaxMethod(buffer).asObservable().Catch(/*Swallow to match example*/))
.Concat() //Concatenate the IObservable<IObservable<>> into IObservable<>
.Subscribe(
res=>this.HandleResult(res),
err => {
console.error(err);
});
我将 RxJS 与一个基本的 Subject 对象一起使用,它在未知的时间从不同的地方接收输入。这些输入需要堆叠起来并异步发送到服务器 - 但我需要等到 ajax 请求完成,然后再尝试处理下一组缓冲输入。
flatMapLatest 不适合我的情况,因为每个响应都必须单独处理。
我可以在处理 ajax 请求时暂停缓冲区,但这意味着我会丢失输入,这是不可接受的。
我看到缓冲区打开对象只是一个可观察对象,我能否以某种方式将定时器可观察对象和自定义可观察对象结合起来,当我的 ajax 请求完成时和 ajax 请求完成时触发已发送 我可以暂停定时器一吗?
抱歉,我还没有任何代码可以展示,我真的还在设计阶段。
我想做的事情可行还是我做的不对?
编辑
这是我目前的代码,它似乎可以满足我的要求 - 但我仍然愿意改进。
var bufferedIntervalPauser = new Rx.Subject<boolean>();
var pausableInterval = Rx.Observable.interval(500).pausable(bufferedIntervalPauser);
var purStream = this.updateReqestSubject.asObservable()
.buffer(pausableInterval)
.where(b => (b || []).length > 0)
var purSub = purStream.subscribe(
next => {
bufferedIntervalPauser.onNext(false); // pause the buffer window
SomeAjaxMethod(next, {
success: res => {
this.HandleResult(res);
},
always: () => {
// when the ajax completes, resume the buffer
bufferedIntervalPauser.onNext(true);
}
});
},
err => {
console.error(err);
});
// start
bufferedIntervalPauser.onNext(true);
您应该可以只使用 buffer
运算符。正如@Enigmativity 指出的那样,Rx 有一个序列化协议,因此您的处理程序永远不会 运行 同时发生,即它不会在上一次调用仍在 运行ning 时被调用。但是,如果您的处理程序本身调用了一个 non-blocking/asnyc 方法,那么在创建一些异步响应之前,没有任何东西会阻止 Rx 调用您的处理程序。在这种情况下,您可能希望将异步调用包装在一个可观察的序列中。然后你可以使用像concat
这样的运算符来确保多个可观察序列的序列化
编辑:
您的问题是您在订阅中进行异步工作。您需要做的是将您的 SomeAjaxMethod
视为一个可观察的序列,然后只使用组合。
//Psuedo code as I dont have a Java IDE at hand
this.updateReqestSubject.asObservable()
.Buffer(500ms)
.Select(buffer=>SomeAjaxMethod(buffer).asObservable().Catch(/*Swallow to match example*/))
.Concat() //Concatenate the IObservable<IObservable<>> into IObservable<>
.Subscribe(
res=>this.HandleResult(res),
err => {
console.error(err);
});