如何在 fromEvent("scroll') 流发布时触发 BehaviorSubject 流的过滤器?

How to trigger a filter of a BehaviorSubject stream when fromEvent("scroll') stream publishes?

前言: 我有一些不正确的程序代码,希望它能传达意图。我感觉像 flatMapswitchMap 这样的运算符会解决这个问题;然而,我仍然没有在 RXJS 中实现思想上的飞跃,以真正理解如何或何时使用这些运算符。所以请多多包涵。

描述: 我有 2 个流:

问题: 当滚动事件发生时,如何触发 var sampleFiltered = this.sampleElementOffsetData 上的过滤器? (这目前没有触发。)此外,我将如何清理嵌套订阅,因为它们目前只是为了传达意图而根本不工作。

import { Directive, HostListener } from '@angular/core';
import { ScrollerElementsStoreModel } from './scroller-elements-store.model';
import 'rxjs/add/operator/filter';
import {Observable} from "rxjs";
import {BehaviorSubject} from 'rxjs/Rx';
import 'rxjs/add/observable/fromEvent';
import 'rxjs/add/operator/filter';

@Directive({
  selector: '[mh-scroll]'
})
export class MhScroll {
  lastKnownScrollPosition: number;
  ticking: boolean;
  sampleElementOffsetData: BehaviorSubject<number[]>;

  constructor(private scrollElementsStore: ScrollerElementsStoreModel) {
    this.lastKnownScrollPosition = 0;
    this.ticking = false;
    this.sampleElementOffsetData = new BehaviorSubject([100,350]);
  }


    isElementCloseToTop(target) {

      var scrolling = Observable.fromEvent(target, 'scroll'); //this works


      var scrollingSub = scrolling.subscribe(
          (x) => { // this works.

            // this section below is completely wrong; 
            // however, hopefully shows intent, that when 
            // scrolling.subscribe triggers I want to run this filter.
            var sampleFiltered = this.sampleElementOffsetData.filter((x) => {
              var offset = x + this.lastKnownScrollPosition;

              // return when element offset is within 10px.
              return offset < 10 && offset > 0 
            },this);


            var sub =  sampleFiltered.subscribe(
              (y) => {
               // Update DOM.
              }
            )
          },
          (err) => {
            console.log('Error: %s', err);
          },
          () => {
            console.log('Completed');
          });

      this.ticking = false;
    }


    @HostListener('window:scroll', ['$event.target'])
    triggeredScroll(target) {
      this.lastKnownScrollPosition = window.scrollY;

      if (!this.ticking) {
        window.requestAnimationFrame(this.isElementCloseToTop.bind(this, target));
      }

      this.ticking = true;
    }
}

我的代码有 2 个问题: 1) BehaviorSubject 中存在类型错误 Observable.filter 只会迭代 Observables 而不是 Observables 值。

2) 我能够使用运算符 .mergeMap 清理它 TL;DR: Map values from source to inner observable, merge output

var scrolling = Observable.fromEvent(target, 'scroll')
          .mergeMap(event => this.scrollElementsStore.elementUpdated.filter((event) => {
            var offset = event + this.lastKnownScrollPosition;
            console.log(event);
            return offset < 10 && offset > 0;
          }
        )).subscribe((x)=>{
          console.log(x, 'asdfasdf');
        })