检查是否未完成 Observable 是否为空
check if not completed Observable is empty
有没有好的方法来检查未完成的 Observable 在那个确切时间是否为空?
let cache = new ReplaySubject<number>(1);
...
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value.
cache.isEmpty().subscribe(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
});
// but that code does not work until cache.complete()
您可以使用 takeUntil()
:
Observable.of(true)
.takeUntil(cache)
.do(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
})
.subscribe();
然而这只能工作一次。
另一种方法是 "null" 缓存并使用 BehaviorSubject
:
将其初始化为空
let cache = new BehaviorSubject<number>(null as any);
...
cache
.do(content => {
if (content == null) {
console.log("I want to be here!!!");
cache.next(0);
}
})
.subscribe();
当然,您可以立即使用一些默认值初始化缓存。
其实并没有那么简单,被接受的答案也不是很普遍。您想检查 ReplaySubject
在这个特定时间点是否为空。
但是,如果你想让它与 ReplaySubject
真正兼容,你还需要考虑 windowTime
参数 指定 "time to live" 对于通过此对象的每个值。这意味着你的cache
是否为空会及时改变。
ReplaySubject
有方法 _trimBufferThenGetEvents
可以满足您的需要。不幸的是,这个方法是私有的,所以你需要在 JavaScript 中做一点 "hack" 并直接扩展它的 prototype
。
import { ReplaySubject } from 'rxjs';
// Tell the compiler there's a isNowEmpty() method
declare module "rxjs/ReplaySubject" {
interface ReplaySubject<T> {
isNowEmpty(): boolean;
}
}
ReplaySubject.prototype['isNowEmpty'] = function() {
let events = this._trimBufferThenGetEvents();
return events.length > 0;
};
然后使用这个 ReplaySubject
很简单:
let s = new ReplaySubject<number>(1, 100);
s.next(3);
console.log(s.isNowEmpty());
s.next(4);
setTimeout(() => {
s.next(5);
s.subscribe(val => console.log('cached:', val));
console.log(s.isNowEmpty());
}, 200);
setTimeout(() => {
console.log(s.isNowEmpty());
}, 400);
请注意,有些调用 isNowEmpty()
return true
,而另一些调用 return false
。例如最后一个 returns false
因为该值在此期间无效。
此示例打印:
true
cached: 5
true
false
您可以使用 .scan()
来累积您的计数,并将其映射到一个布尔值(无论它是否为非零)。 (它需要第二个参数作为种子值,使其以 0 开头,因此它始终反映当前计数。)
我还添加了 .filter() 而不是 if 语句以使其更清晰:
let cache = new ReplaySubject<number>(1);
cache
.map((object: T) => 1)
.scan((count: number, incoming: number) => count + incoming, 0)
.map((sum) => sum == 0)
.filter((isEmpty: boolean) => isEmpty)
.subscribe((isEmpty: boolean) => {
console.log("I want to be here!!!");
cache.next(0);
});
起点
let cache = new ReplaySubject<number>(1);
isEmpty$ = cache.pipe(mapTo(false), startWith(true));
这表示:
- 无论缓存发出什么值 - 将其映射到
false
。 (因为发射后它不是空的)
- 如果还没有发射任何东西(因为那意味着它是空的),则从
true
开始
有没有好的方法来检查未完成的 Observable 在那个确切时间是否为空?
let cache = new ReplaySubject<number>(1);
...
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value.
cache.isEmpty().subscribe(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
});
// but that code does not work until cache.complete()
您可以使用 takeUntil()
:
Observable.of(true)
.takeUntil(cache)
.do(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
})
.subscribe();
然而这只能工作一次。
另一种方法是 "null" 缓存并使用 BehaviorSubject
:
let cache = new BehaviorSubject<number>(null as any);
...
cache
.do(content => {
if (content == null) {
console.log("I want to be here!!!");
cache.next(0);
}
})
.subscribe();
当然,您可以立即使用一些默认值初始化缓存。
其实并没有那么简单,被接受的答案也不是很普遍。您想检查 ReplaySubject
在这个特定时间点是否为空。
但是,如果你想让它与 ReplaySubject
真正兼容,你还需要考虑 windowTime
参数 指定 "time to live" 对于通过此对象的每个值。这意味着你的cache
是否为空会及时改变。
ReplaySubject
有方法 _trimBufferThenGetEvents
可以满足您的需要。不幸的是,这个方法是私有的,所以你需要在 JavaScript 中做一点 "hack" 并直接扩展它的 prototype
。
import { ReplaySubject } from 'rxjs';
// Tell the compiler there's a isNowEmpty() method
declare module "rxjs/ReplaySubject" {
interface ReplaySubject<T> {
isNowEmpty(): boolean;
}
}
ReplaySubject.prototype['isNowEmpty'] = function() {
let events = this._trimBufferThenGetEvents();
return events.length > 0;
};
然后使用这个 ReplaySubject
很简单:
let s = new ReplaySubject<number>(1, 100);
s.next(3);
console.log(s.isNowEmpty());
s.next(4);
setTimeout(() => {
s.next(5);
s.subscribe(val => console.log('cached:', val));
console.log(s.isNowEmpty());
}, 200);
setTimeout(() => {
console.log(s.isNowEmpty());
}, 400);
请注意,有些调用 isNowEmpty()
return true
,而另一些调用 return false
。例如最后一个 returns false
因为该值在此期间无效。
此示例打印:
true
cached: 5
true
false
您可以使用 .scan()
来累积您的计数,并将其映射到一个布尔值(无论它是否为非零)。 (它需要第二个参数作为种子值,使其以 0 开头,因此它始终反映当前计数。)
我还添加了 .filter() 而不是 if 语句以使其更清晰:
let cache = new ReplaySubject<number>(1);
cache
.map((object: T) => 1)
.scan((count: number, incoming: number) => count + incoming, 0)
.map((sum) => sum == 0)
.filter((isEmpty: boolean) => isEmpty)
.subscribe((isEmpty: boolean) => {
console.log("I want to be here!!!");
cache.next(0);
});
起点
let cache = new ReplaySubject<number>(1);
isEmpty$ = cache.pipe(mapTo(false), startWith(true));
这表示:
- 无论缓存发出什么值 - 将其映射到
false
。 (因为发射后它不是空的) - 如果还没有发射任何东西(因为那意味着它是空的),则从
true
开始