缓冲直到 redux-observable 和 rxjs

Buffer until with redux-observable and rxjs

是否有一种模式可以使 redux-observable 史诗缓冲区直到存储中的值为真?

const uploadsCompleteEpic = (action$, store) => {
  return action$
    .ofType(actionTypes.UPLOAD_SUCCESS)
    .bufferWhen(store => store.getState().remaining -1 === 0)
    .do(action => console.log(action))
    .ignoreElements();
};

上面的方法不起作用,因为 bufferWhen 中的函数不是 Observable。我也试过在 Observable.single 中包装缓冲函数但没有成功。

您可以为此使用常规 buffer operator

public buffer(closingNotifier: Observable<any>): Observable<T[]>

Collects values from the past as an array, and emits that array only when the closingNotifier Observable emits.

const uploadsCompleteEpic = (action$, store) => {
  // just used as a notifier, so we don't actually
  // care about what the value is, just when.
  // Since the state is only ever updated when an
  // action is received we can use that time to check.
  const notification$ = action$.filter(() => {
    return store.getState().remaining - 1 === 0;
  );

  return action$
    .ofType(actionTypes.UPLOAD_SUCCESS)
    .buffer(notification$)
    // Use this if you want to exclude empty buffers
    // .filter(buffer => buffer.length > 0) 
    .do(action => console.log(action))
    .ignoreElements();
};

我不知道 remaining 值在什么具体情况下会发生变化或为零,因此您肯定需要进行额外的检查或等待,以免发生竞争。例如排除空缓冲区,如果最终可能的话。