在 RxJava2 中创建自定义运算符?

Creating Custom Operators in RxJava2?

我找不到如何使用 RxJava 2 制作自定义运算符的示例。我考虑了几种方法:

  1. 使用 Observable.create,然后 flatMap 从源可观察到它。我可以让它工作,但感觉不太对劲。我最终创建了一个静态函数,我提供源 Observable,然后在源上提供 flatMap。在 OnSubscribe 中,我然后实例化一个对象,我将发射器传递给该对象,它处理和管理 Observable / Emitter(因为它不是微不足道的,我希望尽可能封装所有内容)。
  2. 正在创建 ObservableOperator 并将其提供给 Observable.lift。我找不到 RxJava 2 的任何示例。我不得不调试自己的示例以确保我对上游和下游的理解是正确的。因为我找不到关于 RxJava 2 的任何示例或文档,所以我有点担心我可能会不小心做一些我不应该做的事情。
  3. 创建我自己的 Observable 类型。这似乎是底层运算符的工作方式,其中许多扩展 AbstractObservableWithUpstream。虽然这里发生了很多事情,但似乎很容易错过某些事情或做一些我不应该做的事情。我不确定我是否应该采用这样的方法。我逐步完成了心理过程,它似乎很快就会变得毛茸茸。

我将继续使用选项 #2,但我认为值得询问 RxJava2 中支持的方法是什么,并找出是否有任何文档或示例。

不建议初学者编写运算符,许多所需的流模式可以通过现有运算符实现。

你看过 RxJava 关于 writing operators for 2.x 的 wiki 吗?我建议从上到下阅读。

  1. 使用 create() 是可能的,但大多数人使用它来发出带有 for-each 循环的 List 的元素,而没有意识到 Flowable.fromIterable 会这样做。
  2. 尽管 RxJava 2 运算符本身不使用 lift(),但我们保留了这个扩展点。如果你想避免使用选项 3 的一些样板文件。那么你可以尝试 this route.
  3. 这就是 RxJava 2 运算符的实现方式。 AbstractObservableWithUpstream 是一个小便利,对于 external implementors 不是必需的。

这可能对您有所帮助。我实现了 operator RxJava2 来处理 APiError。我用了电梯操作员。

参见示例。

  public final class ApiClient implements ApiClientInterface {
    ...
      @NonNull
      @Override
      public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
          return myApiService.activate(email, emailData)
                  .lift(getApiErrorTransformer())
                  .subscribeOn(Schedulers.io());
      }

      private <T>ApiErrorOperator<T> getApiErrorTransformer() {
          return new ApiErrorOperator<>(gson, networkService);
      }

  }

然后你可以找到自定义运算符

    public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
        private static final String TAG = "ApiErrorOperator";
        private final Gson gson;
        private final NetworkService networkService;

        public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
            this.gson = gson;
            this.networkService = networkService;
        }

        @Override
        public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
            return new Observer<T>() {
                @Override
                public void onSubscribe(Disposable d) {
                    observer.onSubscribe(d);
                }

                @Override
                public void onNext(T value) {
                    observer.onNext(value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError", e);

                if (e instanceof HttpException) {
                        try {
                            HttpException error = (HttpException) e;
                            Response response = error.response();
                            String errorBody = response.errorBody().string();

                            ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
                            ApiException exception = new ApiException(errorResponse, response);

                            observer.onError(exception);
                        } catch (IOException exception) {
                            observer.onError(exception);
                        }

                    } else if (!networkService.isNetworkAvailable()) {
                        observer.onError(new NetworkException(ErrorResponse.builder()
                                .setErrorCode("")
                                .setDescription("No Network Connection Error")
                                .build()));
                    } else {
                        observer.onError(e);
                    }
                }

                @Override
                public void onComplete() {
                    observer.onComplete();
                }
            };
        }
    }