Repeating/Resetting 一个可观察的

Repeating/Resetting an observable

我正在使用 rxjs 为智能电视上的遥控器创建一个 "channel nummber" 选择器。这个想法是,当你输入数字时,你会在屏幕上看到它们,当你输入完数字后,用户实际上会被带到那个频道。

我使用两个 observables 来实现这个:

  1. 一个 "progress" 流,它侦听所有数字输入并在通过扫描运算符输入数字时发出连接的数字字符串。

  2. 一个 "completed" 流,在 n 毫秒没有输入任何数字后,将发出完成的最终数字字符串。例如:1-2-3 -> "123".

这是我用来尝试解决此问题的代码:

频道数:

module.exports = function (numberKeys, source, scheduler) {
    return function (completedDelay) {
        var toNumericString = function (name) {
                return numberKeys.indexOf(name).toString();
            },
            concat = function (current, numeric) {
                return current.length === 3 ? current : current + numeric;
            },
            live = createPress(source, scheduler)(numberKeys)
                .map(toNumericString)
                .scan(concat, '')
                .distinctUntilChanged(),

            completed = live.flatMapLatest(function (value) {
                return Rx.Observable.timer(completedDelay, scheduler).map(value);
            }),
            progress = live.takeUntil(completed).repeat();

        return {
            progress: progress,
            completed: completed
        };
    };
};

createPress:

module.exports = function (source, scheduler) {
    return function (keyName, throttle) {
        return source
            .filter(H.isKeyDownOf(keyName))
            .map(H.toKeyName);
    };
};

创建源:

module.exports = function (provider) {
    var createStream = function (event) {
        var filter = function (e) {
                return provider.hasCode(e.keyCode);
            },
            map = function (e) {
                return {
                    type: event,
                    name: provider.getName(e.keyCode),
                    code: e.keyCode
                };
            };
        return Rx.Observable.fromEvent(document, event)
            .filter(filter)
            .map(map);
    };

    return Rx.Observable.merge(createStream('keyup'), createStream('keydown'));
};

有趣的是,上面的代码在测试条件下(使用 Rx.TestScheduler 模拟源和调度程序)按预期工作。但是在生产中,当调度程序根本没有通过并且源是 createPress(上图)的结果时,进度流只会发出直到完成,然后再也不会发出。就好像重复被完全忽略或多余了。我不知道为什么。

我是不是遗漏了什么?

您可以使用 Window. In this case, I would suggest WindowWithTime. You can also do more interesting things like use Window(windowBoundaries) and then pass the source with Debounce 作为边界。

source
  .windowWithTime(1500)
  .flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

此外,由于我们的 windows 是封闭的可观察对象,我们可以使用 Reduce 来累积 window 的值并连接我们的数字。


现在,此变体将在 1.5 秒后关闭。相反,我们希望在最后一次按键后等待 x 秒。天真我们可以做 source.window(source.debounce(1000)) 但现在我们订阅了我们的来源两次,这是我们想要避免的事情,原因有两个。首先我们不知道订阅有什么副作用,其次我们不知道订阅会收到事件的顺序。最后一件事不是问题,因为我们使用的去抖动已经在最后一次按键后增加了延迟,但仍然需要考虑。

解决方法是publish our source. In order to keep the publish inside the sequence, we wrap it into observable.create.

Rx.Observable.create(observer => {
    var ob = source.publish();
    return new Rx.CompositeDisposable(
      ob.window(ob.debounce(1000))
        .subscribe(observer),
      ob.connect());
}).flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))

编辑:或像这样使用 publish

source.publish(ob => ob.window(ob.debounce(1000)))
    .flatMap(ob => ob.reduce((acc, cur) => acc + cur, ""))