检查是否未完成 Observable 是否为空

check if not completed Observable is empty

有没有好的方法来检查未完成的 Observable 在那个确切时间是否为空?

let cache = new ReplaySubject<number>(1);
...
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value.
cache.isEmpty().subscribe(isEmpty => {
    if (isEmpty) {
        console.log("I want to be here!!!");
        cache.next(0);
    }
});
// but that code does not work until cache.complete()

您可以使用 takeUntil():

Observable.of(true)
    .takeUntil(cache)
    .do(isEmpty => {
        if (isEmpty) {
            console.log("I want to be here!!!");
            cache.next(0);
        }
    })
    .subscribe();

然而这只能工作一次。


另一种方法是 "null" 缓存并使用 BehaviorSubject:

将其初始化为空
let cache = new BehaviorSubject<number>(null as any);
...
cache
   .do(content => {
       if (content == null) {
           console.log("I want to be here!!!");
           cache.next(0);
       }
    })
    .subscribe();

当然,您可以立即使用一些默认值初始化缓存。

其实并没有那么简单,被接受的答案也不是很普遍。您想检查 ReplaySubject 在这个特定时间点是否为空。

但是,如果你想让它与 ReplaySubject 真正兼容,你还需要考虑 windowTime 参数 指定 "time to live" 对于通过此对象的每个值。这意味着你的cache是否为空会及时改变

ReplaySubject 有方法 _trimBufferThenGetEvents 可以满足您的需要。不幸的是,这个方法是私有的,所以你需要在 JavaScript 中做一点 "hack" 并直接扩展它的 prototype

import { ReplaySubject } from 'rxjs';

// Tell the compiler there's a isNowEmpty() method
declare module "rxjs/ReplaySubject" {
    interface ReplaySubject<T> {
        isNowEmpty(): boolean;
    }
}

ReplaySubject.prototype['isNowEmpty'] = function() {
    let events = this._trimBufferThenGetEvents();
    return events.length > 0;
};

然后使用这个 ReplaySubject 很简单:

let s = new ReplaySubject<number>(1, 100);
s.next(3);
console.log(s.isNowEmpty());
s.next(4);

setTimeout(() => {
    s.next(5);
    s.subscribe(val => console.log('cached:', val));
    console.log(s.isNowEmpty());
}, 200);

setTimeout(() => {
    console.log(s.isNowEmpty());
}, 400);

请注意,有些调用 isNowEmpty() return true,而另一些调用 return false。例如最后一个 returns false 因为该值在此期间无效。

此示例打印:

true
cached: 5
true
false

观看现场演示:https://jsbin.com/sutaka/3/edit?js,console

您可以使用 .scan() 来累积您的计数,并将其映射到一个布尔值(无论它是否为非零)。 (它需要第二个参数作为种子值,使其以 0 开头,因此它始终反映当前计数。)

我还添加了 .filter() 而不是 if 语句以使其更清晰:

let cache = new ReplaySubject<number>(1);

cache
    .map((object: T) => 1)
    .scan((count: number, incoming: number) => count + incoming, 0)
    .map((sum) => sum == 0)
    .filter((isEmpty: boolean) => isEmpty)
    .subscribe((isEmpty: boolean) => {
        console.log("I want to be here!!!");
        cache.next(0);
    });

起点

let cache = new ReplaySubject<number>(1);

isEmpty$ = cache.pipe(mapTo(false), startWith(true));

这表示:

  • 无论缓存发出什么值 - 将其映射到 false。 (因为发射后它不是空的)
  • 如果还没有发射任何东西(因为那意味着它是空的),则从 true 开始