RxJs flatMapLatest/switchMap 取消回调。 onCancel() 在哪里?
RxJs flatMapLatest/switchMap cancel callback. Where is onCancel()?
我有 2 个嵌套的 Observable 流,它们执行 HTTP 请求。现在我想显示一个加载指示器,但无法正常工作。
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.do(function(pageNumber) {
pendingRequests++;
})
.concatMap(function(pageNumber) {
return MyHTTPService.getPage(pageNumber);
})
.do(function(response) {
pendingRequests--;
});
Rx.createObservableFunction(_self, 'search')
.flatMapLatest(function(e) {
return pageStream;
})
.subscribe();
search();
nextPage(2);
nextPage(3);
search();
这将触发 pendingRequests++
4 次,但 pendingRequests--
只会触发一次,因为 flatMapLatest
将在前 3 个 HTTP 响应到达之前取消内部观察。
我找不到任何类似 onCancel
回调的东西。我也尝试了 onCompleted
和 onError
,但这些也不会被 flatMapLatest
触发。
还有其他方法可以使它正常工作吗?
谢谢!
编辑:所需的加载指示器行为
示例:单个 search()
呼叫。
- search() -> 开始加载指示器
- 当 search() 响应返回时 -> 禁用加载指示器
示例:search()
和 nextPage()
调用。 (nextPage() 在 before search() 响应返回之前被调用。)
- search() -> 开始加载指示器
- nextPage() -> 指示器已经启动,但此处无需执行任何操作
- 在 两个 响应到达后停止加载指示器
示例:search()
、search()
。 (search() 调用相互覆盖,尽管第一个调用的响应可以被忽略)
- search() -> 开始加载指示器
- search() -> 指示器已经启动,尽管此处无事可做
- 当 second search() 的响应到达时停止加载指示器
示例:search()
、nextPage()
、search()
。 (再次强调:因为第二次search(),前面search()和nextPage()的响应可以忽略)
- search() -> 开始加载指示器
- nextPage() -> 指示器已经启动,但此处无需执行任何操作
- search() -> 指示器已经启动,尽管此处无事可做
- 当 second search() 的响应到达时停止加载指示器
示例:search()
、nextPage()
。但是这次 nextPage() 是在 search() 响应返回后调用的。
- search() -> 开始加载指示器
- 停止加载指示器,因为 search() 响应到达
- nextPage() -> 开始加载指示器
- 停止加载指示器,因为 nextPage() 响应到达
我尝试使用pendingRequests
计数器,因为我可以同时有多个相关请求(例如:search(), nextPage(), nextPage()
)。然后我当然想在所有相关请求完成后禁用加载指示器。
调用search(), search()
时,第一个search()是无关紧要的。 search(), nextPage(), search()
同样适用。在这两种情况下,只有一个有效的相关请求(最后一个 search()
)。
一种方法:使用 finally 运算符 (rxjs4 docs, rxjs5 source)。 Finally 在可观察对象因任何原因取消订阅或完成时触发。
我还将计数器逻辑移动到 concatMap 函数内部,因为您实际上是在计算 getPage 请求,而不是 已通过的值的数量。这是一个微妙的区别。
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.concatMap(function(pageNumber) {
++pendingRequests;
// assumes getPage returns an Observable and not a Promise
return MyHTTPService.getPage(pageNumber)
.finally(function () { --pendingRequests; })
});
使用 switchMap
又名 flatMapLatest
时,您希望 trim 在新的外部项目到达时尽快执行当前内部流。这无疑是一个很好的设计决定,否则它会带来很多混乱并允许一些诡异的动作。如果你真的想做点什么 onCancel
你总是可以使用自定义 unsubscribe
回调创建你自己的可观察对象。但我仍然建议不要将 unsubscribe
与外部上下文的变化状态结合起来。理想情况下 unsubscribe
只会清理内部使用的资源。
然而,您的特殊情况可以在不访问 onCancel
或类似的情况下解决。关键观察是 - 如果我正确理解您的用例 - 在 search
上所有先前/未决操作可能会被忽略。因此,不用担心递减计数器,我们可以简单地从 1 开始计数。
关于片段的一些评论:
- 用于
BehaviorSubject
计算未决请求 - 因为它已准备好与其他流组合;
- 检查了您在问题中发布的所有案例,它们都有效;
- 添加了一些模糊测试来证明正确性;
- 不确定当
search
仍未决时是否允许 nextPage
- 但似乎只是使用 concatMapTo
与 merge
的问题;
- 仅使用标准
Rx
运算符。
console.clear();
const searchSub = new Rx.Subject(); // trigger search
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests
const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);
const fakeSearch = (x) => Rx.Observable.of(x)
.do(() => console.log(`SEARCH-START: ${x}`))
.flatMap(() =>
Rx.Observable.timer(randDuration())
.do(() => console.log(`SEARCH-SUCCESS: ${x}`)))
const fakeNextPage = (x) => Rx.Observable.of(x)
.do(() => console.log(`NEXT-PAGE-START: ${x}`))
.flatMap(() =>
Rx.Observable.timer(randDuration())
.do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))
// subscribes
searchSub
.do(() => console.warn('NEW_SEARCH'))
.do(() => pendingSub.next(1)) // new search -- ingore current state
.switchMap(
(x) => fakeSearch(x)
.do(dec) // search ended
.concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
// .merge(nextPageSub // if you wanted to allow nextPage when search still pending
.do(inc) // nexpage started
.flatMap(fakeNextPage) // optionally switchMap
.do(dec) // nextpage ended
)
).subscribe();
pendingSub
.filter(x => x !== undefined) // behavior-value initially not defined
.subscribe(n => console.log('PENDING-REQUESTS', n))
// TEST
const test = () => {
searchSub.next('s1');
nextPageSub.next('p1');
nextPageSub.next('p2');
setTimeout(() => searchSub.next('s2'), 200)
}
// test();
// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
if (counter % 10 === 0) {
searchSub.next('s' + counter++)
}
nextPageSub.next('p' + counter++);
if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}
fuzzyTest()
<script src="https://npmcdn.com/rxjs@5.0.0-beta.11/bundles/Rx.umd.js"></script>
我从头开始为您的问题写了一个解决方案。
可以肯定的是,它可能以更实用的方式编写,但无论如何它都能工作。
此解决方案基于 reqStack
,其中包含所有请求(保持调用顺序),其中请求是具有 id
、done
和 type
属性的对象.
请求完成后,将调用 requestEnd
方法。
有两个条件,其中至少一个就足以隐藏一个加载器。
- 当堆栈上的最后一个请求是
search
请求时,我们可以隐藏加载器。
否则,所有其他请求必须已经完成。
function getInstance() {
return {
loaderVisible: false,
reqStack: [],
requestStart: function (req){
console.log('%s%s req start', req.type, req.id)
if(_.filter(this.reqStack, r => r.done == false).length > 0 && !this.loaderVisible){
this.loaderVisible = true
console.log('loader visible')
}
},
requestEnd: function (req, body, delay){
console.log('%s%s req end (took %sms), body: %s', req.type, req.id, delay, body)
if(req === this.reqStack[this.reqStack.length-1] && req.type == 'search'){
this.hideLoader(req)
return true
} else if(_.filter(this.reqStack, r => r.done == true).length == this.reqStack.length && this.loaderVisible){
this.hideLoader(req)
return true
}
return false
},
hideLoader: function(req){
this.loaderVisible = false
console.log('loader hidden (after %s%s request)', req.type, req.id)
},
getPage: function (req, delay) {
this.requestStart(req)
return Rx.Observable
.fromPromise(Promise.resolve("<body>" + Math.random() + "</body>"))
.delay(delay)
},
search: function (id, delay){
var req = {id: id, done: false, type: 'search'}
this.reqStack.push(req)
return this.getPage(req, delay).map(body => {
_.find(this.reqStack, r => r.id == id && r.type == 'search').done = true
return this.requestEnd(req, body, delay)
})
},
nextPage: function (id, delay){
var req = {id: id, done: false, type: 'nextPage'}
this.reqStack.push(req)
return this.getPage(req, delay).map(body => {
_.find(this.reqStack, r => r.id == id && r.type == 'nextPage').done = true
return this.requestEnd(req, body, delay)
})
},
}
}
Moca 中的单元测试:
describe('animation loader test:', function() {
var sut
beforeEach(function() {
sut = getInstance()
})
it('search', function (done) {
sut.search('1', 10).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, nextPage', function (done) {
sut.search('1', 50).subscribe(expectDidHideLoader)
sut.nextPage('1', 20).subscribe(expectDidNOTHideLoader)
testDone(done)
})
it('search, nextPage, nextPage', function(done) {
sut.search('1', 50).subscribe(expectDidHideLoader)
sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
sut.nextPage('2', 30).subscribe(expectDidNOTHideLoader)
testDone(done)
})
it('search, nextPage, nextPage - reverse', function(done) {
sut.search('1', 30).subscribe(expectDidNOTHideLoader)
sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
sut.nextPage('2', 50).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, search', function (done) {
sut.search('1', 60).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
sut.search('2', 50).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, search - reverse', function (done) {
sut.search('1', 40).subscribe(expectDidNOTHideLoader)
sut.search('2', 50).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, nextPage, search', function (done) {
sut.search('1', 40).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
sut.nextPage('1', 30).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
sut.search('2', 10).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, nextPage (call after response from search)', function (done) {
sut.search('1', 10).subscribe(result => {
expectDidHideLoader(result)
sut.nextPage('1', 10).subscribe(expectDidHideLoader)
})
testDone(done)
})
function expectDidNOTHideLoader(result){
expect(result).to.be.false
}
function expectDidHideLoader(result){
expect(result).to.be.true
}
function testDone(done){
setTimeout(function(){
done()
}, 200)
}
})
部分输出:
JSFiddle 是 here.
我认为有一个更简单的解决方案,为了解释它,我想 "re-phrase" 您在编辑中提供的示例:
- 只要有未关闭的请求,状态就是"pending"。
- 响应关闭所有先前的请求。
或者,stream/marbles时尚
(O = 请求 [打开],C = 响应 [关闭],p = 待处理,x = 非待处理)
http流:------O---O---O---C---O---C---O---O---C--- O---
------ 状态:x----P------------x---P----x----P-- ------x---P---
您可以看到计数并不重要,我们有一个标志实际上是打开(待定)或关闭(响应是 returned)。这是真的,因为你 switchMap/flatMap,或者正如你在编辑结束时所说,每次只有一个活动请求。
标志实际上是一个布尔值 observable/oberver 或者只是一个主题。
所以,你需要先定义:
var hasPending: Subject<boolean> = BehaviorSubject(false);
BehaviorSubject 的相关性有两个原因:
- 您可以设置一个初始值(false = 没有任何待定)。
- 新订阅者获得最后一个值,因此,即使是稍后创建的组件也会知道您是否有待处理的请求。
接下来的事情就变得简单了,只要你发出一个请求,将 pending 设置为 'true',当一个请求完成后,将 pending 标志设置为 'false'。
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.do(function(pageNumber) {
hasPending.next(true);
})
.concatMap(function(pageNumber) {
return MyHTTPService.getPage(pageNumber);
})
.do(function(response) {
hasPending.next(false);
});
Rx.createObservableFunction(_self, 'search')
.flatMapLatest(函数(e) {
return 页面流;
})
.subscribe();
这是 rxjs 5 语法,对于 rxjs 4 使用 onNext(...)
如果您不需要将您的公寓作为可观察对象,而只是一个值,只需声明:
var hasPending: booolean = false;
然后在.do之前http调用do
hasPending = true;
并在 http 调用后的 .do 中执行
hasPending = false;
就是这样:-)
顺便说一句,在重新阅读所有内容后,您可以通过更简单(虽然有些快速和肮脏)的解决方案来测试它:
将您的 post http 'do' 调用更改为:
.do(function(response) {
pendingRequests = 0;
});
我有 2 个嵌套的 Observable 流,它们执行 HTTP 请求。现在我想显示一个加载指示器,但无法正常工作。
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.do(function(pageNumber) {
pendingRequests++;
})
.concatMap(function(pageNumber) {
return MyHTTPService.getPage(pageNumber);
})
.do(function(response) {
pendingRequests--;
});
Rx.createObservableFunction(_self, 'search')
.flatMapLatest(function(e) {
return pageStream;
})
.subscribe();
search();
nextPage(2);
nextPage(3);
search();
这将触发 pendingRequests++
4 次,但 pendingRequests--
只会触发一次,因为 flatMapLatest
将在前 3 个 HTTP 响应到达之前取消内部观察。
我找不到任何类似 onCancel
回调的东西。我也尝试了 onCompleted
和 onError
,但这些也不会被 flatMapLatest
触发。
还有其他方法可以使它正常工作吗?
谢谢!
编辑:所需的加载指示器行为
示例:单个
search()
呼叫。- search() -> 开始加载指示器
- 当 search() 响应返回时 -> 禁用加载指示器
示例:
search()
和nextPage()
调用。 (nextPage() 在 before search() 响应返回之前被调用。)- search() -> 开始加载指示器
- nextPage() -> 指示器已经启动,但此处无需执行任何操作
- 在 两个 响应到达后停止加载指示器
示例:
search()
、search()
。 (search() 调用相互覆盖,尽管第一个调用的响应可以被忽略)- search() -> 开始加载指示器
- search() -> 指示器已经启动,尽管此处无事可做
- 当 second search() 的响应到达时停止加载指示器
示例:
search()
、nextPage()
、search()
。 (再次强调:因为第二次search(),前面search()和nextPage()的响应可以忽略)- search() -> 开始加载指示器
- nextPage() -> 指示器已经启动,但此处无需执行任何操作
- search() -> 指示器已经启动,尽管此处无事可做
- 当 second search() 的响应到达时停止加载指示器
示例:
search()
、nextPage()
。但是这次 nextPage() 是在 search() 响应返回后调用的。- search() -> 开始加载指示器
- 停止加载指示器,因为 search() 响应到达
- nextPage() -> 开始加载指示器
- 停止加载指示器,因为 nextPage() 响应到达
我尝试使用pendingRequests
计数器,因为我可以同时有多个相关请求(例如:search(), nextPage(), nextPage()
)。然后我当然想在所有相关请求完成后禁用加载指示器。
调用search(), search()
时,第一个search()是无关紧要的。 search(), nextPage(), search()
同样适用。在这两种情况下,只有一个有效的相关请求(最后一个 search()
)。
一种方法:使用 finally 运算符 (rxjs4 docs, rxjs5 source)。 Finally 在可观察对象因任何原因取消订阅或完成时触发。
我还将计数器逻辑移动到 concatMap 函数内部,因为您实际上是在计算 getPage 请求,而不是 已通过的值的数量。这是一个微妙的区别。
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.concatMap(function(pageNumber) {
++pendingRequests;
// assumes getPage returns an Observable and not a Promise
return MyHTTPService.getPage(pageNumber)
.finally(function () { --pendingRequests; })
});
使用 switchMap
又名 flatMapLatest
时,您希望 trim 在新的外部项目到达时尽快执行当前内部流。这无疑是一个很好的设计决定,否则它会带来很多混乱并允许一些诡异的动作。如果你真的想做点什么 onCancel
你总是可以使用自定义 unsubscribe
回调创建你自己的可观察对象。但我仍然建议不要将 unsubscribe
与外部上下文的变化状态结合起来。理想情况下 unsubscribe
只会清理内部使用的资源。
然而,您的特殊情况可以在不访问 onCancel
或类似的情况下解决。关键观察是 - 如果我正确理解您的用例 - 在 search
上所有先前/未决操作可能会被忽略。因此,不用担心递减计数器,我们可以简单地从 1 开始计数。
关于片段的一些评论:
- 用于
BehaviorSubject
计算未决请求 - 因为它已准备好与其他流组合; - 检查了您在问题中发布的所有案例,它们都有效;
- 添加了一些模糊测试来证明正确性;
- 不确定当
search
仍未决时是否允许nextPage
- 但似乎只是使用concatMapTo
与merge
的问题; - 仅使用标准
Rx
运算符。
console.clear();
const searchSub = new Rx.Subject(); // trigger search
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests
const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);
const fakeSearch = (x) => Rx.Observable.of(x)
.do(() => console.log(`SEARCH-START: ${x}`))
.flatMap(() =>
Rx.Observable.timer(randDuration())
.do(() => console.log(`SEARCH-SUCCESS: ${x}`)))
const fakeNextPage = (x) => Rx.Observable.of(x)
.do(() => console.log(`NEXT-PAGE-START: ${x}`))
.flatMap(() =>
Rx.Observable.timer(randDuration())
.do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))
// subscribes
searchSub
.do(() => console.warn('NEW_SEARCH'))
.do(() => pendingSub.next(1)) // new search -- ingore current state
.switchMap(
(x) => fakeSearch(x)
.do(dec) // search ended
.concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
// .merge(nextPageSub // if you wanted to allow nextPage when search still pending
.do(inc) // nexpage started
.flatMap(fakeNextPage) // optionally switchMap
.do(dec) // nextpage ended
)
).subscribe();
pendingSub
.filter(x => x !== undefined) // behavior-value initially not defined
.subscribe(n => console.log('PENDING-REQUESTS', n))
// TEST
const test = () => {
searchSub.next('s1');
nextPageSub.next('p1');
nextPageSub.next('p2');
setTimeout(() => searchSub.next('s2'), 200)
}
// test();
// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
if (counter % 10 === 0) {
searchSub.next('s' + counter++)
}
nextPageSub.next('p' + counter++);
if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}
fuzzyTest()
<script src="https://npmcdn.com/rxjs@5.0.0-beta.11/bundles/Rx.umd.js"></script>
我从头开始为您的问题写了一个解决方案。
可以肯定的是,它可能以更实用的方式编写,但无论如何它都能工作。
此解决方案基于 reqStack
,其中包含所有请求(保持调用顺序),其中请求是具有 id
、done
和 type
属性的对象.
请求完成后,将调用 requestEnd
方法。
有两个条件,其中至少一个就足以隐藏一个加载器。
- 当堆栈上的最后一个请求是
search
请求时,我们可以隐藏加载器。 否则,所有其他请求必须已经完成。
function getInstance() { return { loaderVisible: false, reqStack: [], requestStart: function (req){ console.log('%s%s req start', req.type, req.id) if(_.filter(this.reqStack, r => r.done == false).length > 0 && !this.loaderVisible){ this.loaderVisible = true console.log('loader visible') } }, requestEnd: function (req, body, delay){ console.log('%s%s req end (took %sms), body: %s', req.type, req.id, delay, body) if(req === this.reqStack[this.reqStack.length-1] && req.type == 'search'){ this.hideLoader(req) return true } else if(_.filter(this.reqStack, r => r.done == true).length == this.reqStack.length && this.loaderVisible){ this.hideLoader(req) return true } return false }, hideLoader: function(req){ this.loaderVisible = false console.log('loader hidden (after %s%s request)', req.type, req.id) }, getPage: function (req, delay) { this.requestStart(req) return Rx.Observable .fromPromise(Promise.resolve("<body>" + Math.random() + "</body>")) .delay(delay) }, search: function (id, delay){ var req = {id: id, done: false, type: 'search'} this.reqStack.push(req) return this.getPage(req, delay).map(body => { _.find(this.reqStack, r => r.id == id && r.type == 'search').done = true return this.requestEnd(req, body, delay) }) }, nextPage: function (id, delay){ var req = {id: id, done: false, type: 'nextPage'} this.reqStack.push(req) return this.getPage(req, delay).map(body => { _.find(this.reqStack, r => r.id == id && r.type == 'nextPage').done = true return this.requestEnd(req, body, delay) }) }, } }
Moca 中的单元测试:
describe('animation loader test:', function() {
var sut
beforeEach(function() {
sut = getInstance()
})
it('search', function (done) {
sut.search('1', 10).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, nextPage', function (done) {
sut.search('1', 50).subscribe(expectDidHideLoader)
sut.nextPage('1', 20).subscribe(expectDidNOTHideLoader)
testDone(done)
})
it('search, nextPage, nextPage', function(done) {
sut.search('1', 50).subscribe(expectDidHideLoader)
sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
sut.nextPage('2', 30).subscribe(expectDidNOTHideLoader)
testDone(done)
})
it('search, nextPage, nextPage - reverse', function(done) {
sut.search('1', 30).subscribe(expectDidNOTHideLoader)
sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
sut.nextPage('2', 50).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, search', function (done) {
sut.search('1', 60).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
sut.search('2', 50).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, search - reverse', function (done) {
sut.search('1', 40).subscribe(expectDidNOTHideLoader)
sut.search('2', 50).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, nextPage, search', function (done) {
sut.search('1', 40).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
sut.nextPage('1', 30).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
sut.search('2', 10).subscribe(expectDidHideLoader)
testDone(done)
})
it('search, nextPage (call after response from search)', function (done) {
sut.search('1', 10).subscribe(result => {
expectDidHideLoader(result)
sut.nextPage('1', 10).subscribe(expectDidHideLoader)
})
testDone(done)
})
function expectDidNOTHideLoader(result){
expect(result).to.be.false
}
function expectDidHideLoader(result){
expect(result).to.be.true
}
function testDone(done){
setTimeout(function(){
done()
}, 200)
}
})
部分输出:
JSFiddle 是 here.
我认为有一个更简单的解决方案,为了解释它,我想 "re-phrase" 您在编辑中提供的示例:
- 只要有未关闭的请求,状态就是"pending"。
- 响应关闭所有先前的请求。
或者,stream/marbles时尚
(O = 请求 [打开],C = 响应 [关闭],p = 待处理,x = 非待处理)
http流:------O---O---O---C---O---C---O---O---C--- O---
------ 状态:x----P------------x---P----x----P-- ------x---P---
您可以看到计数并不重要,我们有一个标志实际上是打开(待定)或关闭(响应是 returned)。这是真的,因为你 switchMap/flatMap,或者正如你在编辑结束时所说,每次只有一个活动请求。
标志实际上是一个布尔值 observable/oberver 或者只是一个主题。
所以,你需要先定义:
var hasPending: Subject<boolean> = BehaviorSubject(false);
BehaviorSubject 的相关性有两个原因:
- 您可以设置一个初始值(false = 没有任何待定)。
- 新订阅者获得最后一个值,因此,即使是稍后创建的组件也会知道您是否有待处理的请求。
接下来的事情就变得简单了,只要你发出一个请求,将 pending 设置为 'true',当一个请求完成后,将 pending 标志设置为 'false'。
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.do(function(pageNumber) {
hasPending.next(true);
})
.concatMap(function(pageNumber) {
return MyHTTPService.getPage(pageNumber);
})
.do(function(response) {
hasPending.next(false);
});
Rx.createObservableFunction(_self, 'search') .flatMapLatest(函数(e) { return 页面流; }) .subscribe();
这是 rxjs 5 语法,对于 rxjs 4 使用 onNext(...)
如果您不需要将您的公寓作为可观察对象,而只是一个值,只需声明:
var hasPending: booolean = false;
然后在.do之前http调用do
hasPending = true;
并在 http 调用后的 .do 中执行
hasPending = false;
就是这样:-)
顺便说一句,在重新阅读所有内容后,您可以通过更简单(虽然有些快速和肮脏)的解决方案来测试它: 将您的 post http 'do' 调用更改为:
.do(function(response) {
pendingRequests = 0;
});