如何处理 redux-observable 中的异步函数?

How to handle async function in redux-observable?

我正在使用 RxJS 和 redux-observable。

我正在尝试读取史诗文件。就我而言,我必须在史诗中进行,因为其他一些史诗通过 expand 运算符多次触发此史诗 "unknown" 次。

但是由于 FileReader 是异步的,所以下面的代码不起作用。

处理这个问题的正确方法是什么,尤其是 RxJS 方法?谢谢

export const uploadAttachmentEpic = (action$, store) =>
  action$
    .ofType(UPLOAD_ATTACHMENT)
    .map(action => {
      const reader = new FileReader();

      reader.onload = () => {
        return {
          ...action,
          payload: {
            ...action.payload,
            base64: reader.result
          }
        }
      };

      reader.readAsDataURL(action.payload.file);
    })
    .mergeMap(action =>
      ajax
        .post( /* use action.payload.base64 */ )
        .map(uploadAttachmentSucceed)
        .catch(uploadAttachmentFailed)
    );

您的文件读取进程执行 return Observable。异步进程未正确处理。我建议首先创建一个 return 可观察的文件读取函数。然后将其附加到 flapMap()

  function readFile(file){
    let reader = new FileReader();
      return Observable.create(obs => {
        reader.onload = function (e) {
            obs.next(reader.result);
        };
        reader.onerror = obs.error;
    })
        reader.readAsDataURL(file);
   }

然后在您的代码中,您可以将其合并为 ..flatMap(file=>readFile(file))

粉丝的回答(截至撰写本文时)很好,但有一些重要的注意事项:

  • 它立即开始读取文件,而不是延迟读取。因此,即使在任何人订阅之前,只需调用 readFile(file) 即可启动它。这很容易出错,因为有人可能不会立即同步订阅它,然后 reader.onload 就会错过它。理想情况下,Observables 是完全惰性和可重复的工厂。

  • 它从不对观察者调用 obs.complete(),因此订阅可能会发生内存泄漏,因为它永远不会结束。

  • 观察者的方法没有绑定,所以reader.onerror = obs.error实际上不会起作用。相反,您需要 e => obs.error(e)obs.error.bind(obs) See here for reference on why

  • 取消订阅时不会中止读取。

以下是我的做法:

function readFile(file){
  // Could use Observable.create (same thing) but I
  // prefer this one because Observable.create is
  // not part of the TC39 proposal
  return new Observable(observer => {
    const reader = new FileReader();
    reader.onload = (e) => {
      observer.next(reader.result);
      // It's important to complete() otherwise this
      // subscription might get leaked because it
      // "never ends"
      observer.complete();
    };
    reader.onerror = e => observer.error(e);
    reader.readAsDataURL(file);

    // unsubscribe handler aka cleanup
    return () => {
      // LOADING state.
      // Calling abort() any other time
      // will throw an exception.
      if (reader.readyState === 1) {
        reader.abort();
      }
    };
  });
}

此模式几乎可以应用于任何 API,因此了解其工作原理非常方便。


希望范主不介意批评!我没有冒犯的意思,只是想分享知识。