Rxjs 如何获取在 concatMap 期间缓冲的所有值

Rxjs how to get all values that are buffered during a concatMap

考虑以下流:

interval(1000)
  .pipe(
    take(5),
    concatMap((val) => {
      console.log(val, 'process');
      return of(val).pipe(delay(3000));
    })
  )
  .subscribe((val) => console.log(val, 'emit'));

正如预期的那样,0 发出并命中了 concat 映射。我们看到 'process' 已记录。在此期间,1 和 2 已从源头发出。我想做的是说“现在我的 concatmap 已经完成,给我所有当前在流中的项目。”所以下一个发射将是 [1, 2],我们将看到“[1,2] 进程”。

我不确定如何实现。我已经尝试使用 buffer 并在每次 concatMap 发出时发出,但它从来没有在该缓冲区上获得初始发出,它使用计时器和竞赛进行黑客攻击,即使那样它也不能很好地工作。

这是我目前的解决方案:

const dequeueSignal$ = new Subject();

interval(5000)
  .pipe(
    buffer(dequeueSignal$),
    concatMap((val) => {
      console.log(val, 'process');
      return of(val).pipe(
        delay(getRandomInt(20000)),
        tap(() => dequeueSignal$.next(null))
      );
    })
  )
  .subscribe((val) => console.log(val, 'emit'));

dequeueSignal$.next(null)

这是我能得到的最接近的。这意味着如果缓冲区中没有任何内容,它仍然会发出并且循环继续。但是,这有很大的缺点:

因此,这感觉很老套和脆弱。是否有可用于创建此场景的运算符集?

如果我理解正确,我会这样处理。

首先我们隔离源流。考虑到我们使用 share 运算符来确保 source$ 流被我们稍后从 source$.

开始创建的其他 Observable 共享。
const source$ = interval(1000).pipe(share(), take(5));

然后我认为您在示例中放置的延迟表示由某种函数执行的某种处理,可能是对异步服务的调用。如果这是真的,那么我们可以有一个 process 函数,其中 returns 一个 Observable。这个函数的模拟版本可能是这个

function process(val) {
  return of(val).pipe(
    delay(3000),
    tap({
      next: (val) => console.log(val, "processed"),
    })
  );
}

现在,如果所有这些假设都成立,那么我们可以定义一个 dequeueSignal$ 主题,如您的示例中所示,然后定义 2 个不同的流,一个流仅采用 [=14 通知的第一个元素=] 和一个包含所有其他元素的流,像这样

const dequeueSignal$ = new Subject<any>();

const first$ = source$.pipe(
  first(),
  concatMap((val) => {
    return process(val).pipe(
      tap({
        complete: () => {
          dequeueSignal$.next(null);
        },
      })
    );
  })
);

const afterFirst$ = source$.pipe(skip(1)).pipe(
  buffer(dequeueSignal$),
  concatMap((val) => {
    return process(val).pipe(
      tap({
        complete: () => {
          dequeueSignal$.next(null);
        },
      })
    );
  })
);

dequeueSignal$用于触发释放buffer运算符存储的缓冲区。

dequeueSignal$first$ 流中被 nexted 一次,并且在 afterFirst$ 流通知的任何时候。

dequeueSignal$ 的通知触发缓冲项目的释放。

Here a stackblitz 显示代码。

可能更优雅的解决方案可以作为 mergeMap 运算符 code 的变体来实现,但它可能看起来有点复杂。

自定义运算符可以满足此要求,该运算符利用 concatMap 但会根据需要修改其行为。

自定义运算符必须保持一些状态:

  • 一个 bufferedNotifications 缓冲区,它存储从上游接收的元素,以防已经有一个正在运行的请求
  • a processing 布尔值,如果为真,表示当前正在处理一个请求,在 processing 期间收到的任何通知都会添加到 bufferedNotifications

重要的是要注意,自定义运算符是 returns 类型 (observable: Observable) => Observable.

的函数

说了这么多,我们来看一下自定义算子的代码

// the custom operator requires a function as input
// this function represents the actual processing that happens in the stream
// a type T is also added for better type checking and support
function concatBufferedMap<T>(processor: (val: T[]) => Observable<T[]>) {
  // as said above, the custom operator returns a function which expects 
  // an Obaservable as input and returns another Obaservable
  return (sourceObservable: Observable<T>) =>
    // the Observable to be returned is constructed using the Observable constructor
    new Observable<T[]>((subscriber) => {
      // this function will be called each time this
      // Observable is subscribed to.

      // here we define the variables holding the state
      // the state is initialized any time the Observable is subscribed 
      let bufferedNotifications = [] as T[];
      let processing = false;

      // the Observable returned by the custom operator transforms the
      // source observable via a pipe and then subscribes to it
      const subscription = sourceObservable
        .pipe(
          // the first thing we do when a value is notified by the 
          // source observable (i.e. by upstream) is to push the value
          // into the buffer
          tap((val) => {
            bufferedNotifications.push(val);
          }),
          // then we leverage concatMap to make sure that we concatenate
          // the various processings
          concatMap(() => {
            // if we are processing a previous request 
            // or if there are no items in the buffer 
            // just complete without notifying anything
            // concatMap requires that the value returned by the function
            // passed in as parameter is an Observable, so in this case
            // we return EMPTY, which is an Observable that does nothing
            // and immediately completes
            if (processing || bufferedNotifications.length === 0) {
              return EMPTY;
            }
            // otherwise, if there is no processing on fly and there are 
            // values in the buffer, we return the Observable created by
            // the processor function and we set the state so that it reflects
            // that we have started processing something
            processing = true;
            // we create a copy of the buffer so that the param passed
            // to the processor function is not altered by other potential
            // values which can be added to the buffer during the execution
            // of processor logic
            const _buffer = [...bufferedNotifications];
            // the buffer is also reset
            bufferedNotifications = [];
            return processor(_buffer).pipe(
              tap({
                complete: () => {
                  console.log("DONE concatMap");
                  // when processor completes its processing, the processing
                  // state variable is set to false
                  processing = false;
                },
              })
            );
          })
        )
        // in the subscription logic, we notify the subscriber (i.e. 
        // downstream) of the values notified by the processor logic
        .subscribe({
          next(value) {
            subscriber.next(value);
          },
          error(err) {
            // We need to make sure we're propagating our errors through.
            subscriber.error(err);
          },
          complete() {
            console.log("subscriber complete");
            subscriber.complete();
          },
        });

      // Return the teardown logic. This will be invoked when
      // the result errors, completes, or is unsubscribed.
      return () => {
        subscription.unsubscribe();
      };
    });
}

This stackblitz 显示代码如何工作以及应产生预期结果。