在 RxJava2 中创建自定义运算符?
Creating Custom Operators in RxJava2?
我找不到如何使用 RxJava 2 制作自定义运算符的示例。我考虑了几种方法:
- 使用
Observable.create
,然后 flatMap
从源可观察到它。我可以让它工作,但感觉不太对劲。我最终创建了一个静态函数,我提供源 Observable
,然后在源上提供 flatMap。在 OnSubscribe 中,我然后实例化一个对象,我将发射器传递给该对象,它处理和管理 Observable / Emitter(因为它不是微不足道的,我希望尽可能封装所有内容)。
- 正在创建
ObservableOperator
并将其提供给 Observable.lift
。我找不到 RxJava 2 的任何示例。我不得不调试自己的示例以确保我对上游和下游的理解是正确的。因为我找不到关于 RxJava 2 的任何示例或文档,所以我有点担心我可能会不小心做一些我不应该做的事情。
- 创建我自己的
Observable
类型。这似乎是底层运算符的工作方式,其中许多扩展 AbstractObservableWithUpstream
。虽然这里发生了很多事情,但似乎很容易错过某些事情或做一些我不应该做的事情。我不确定我是否应该采用这样的方法。我逐步完成了心理过程,它似乎很快就会变得毛茸茸。
我将继续使用选项 #2,但我认为值得询问 RxJava2 中支持的方法是什么,并找出是否有任何文档或示例。
不建议初学者编写运算符,许多所需的流模式可以通过现有运算符实现。
你看过 RxJava 关于 writing operators for 2.x 的 wiki 吗?我建议从上到下阅读。
- 使用
create()
是可能的,但大多数人使用它来发出带有 for-each 循环的 List
的元素,而没有意识到 Flowable.fromIterable
会这样做。
- 尽管 RxJava 2 运算符本身不使用
lift()
,但我们保留了这个扩展点。如果你想避免使用选项 3 的一些样板文件。那么你可以尝试 this route.
- 这就是 RxJava 2 运算符的实现方式。
AbstractObservableWithUpstream
是一个小便利,对于 external implementors 不是必需的。
这可能对您有所帮助。我实现了 operator RxJava2 来处理 APiError。我用了电梯操作员。
参见示例。
public final class ApiClient implements ApiClientInterface {
...
@NonNull
@Override
public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
return myApiService.activate(email, emailData)
.lift(getApiErrorTransformer())
.subscribeOn(Schedulers.io());
}
private <T>ApiErrorOperator<T> getApiErrorTransformer() {
return new ApiErrorOperator<>(gson, networkService);
}
}
然后你可以找到自定义运算符
public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
private static final String TAG = "ApiErrorOperator";
private final Gson gson;
private final NetworkService networkService;
public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
this.gson = gson;
this.networkService = networkService;
}
@Override
public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(T value) {
observer.onNext(value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError", e);
if (e instanceof HttpException) {
try {
HttpException error = (HttpException) e;
Response response = error.response();
String errorBody = response.errorBody().string();
ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
ApiException exception = new ApiException(errorResponse, response);
observer.onError(exception);
} catch (IOException exception) {
observer.onError(exception);
}
} else if (!networkService.isNetworkAvailable()) {
observer.onError(new NetworkException(ErrorResponse.builder()
.setErrorCode("")
.setDescription("No Network Connection Error")
.build()));
} else {
observer.onError(e);
}
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
我找不到如何使用 RxJava 2 制作自定义运算符的示例。我考虑了几种方法:
- 使用
Observable.create
,然后flatMap
从源可观察到它。我可以让它工作,但感觉不太对劲。我最终创建了一个静态函数,我提供源Observable
,然后在源上提供 flatMap。在 OnSubscribe 中,我然后实例化一个对象,我将发射器传递给该对象,它处理和管理 Observable / Emitter(因为它不是微不足道的,我希望尽可能封装所有内容)。 - 正在创建
ObservableOperator
并将其提供给Observable.lift
。我找不到 RxJava 2 的任何示例。我不得不调试自己的示例以确保我对上游和下游的理解是正确的。因为我找不到关于 RxJava 2 的任何示例或文档,所以我有点担心我可能会不小心做一些我不应该做的事情。 - 创建我自己的
Observable
类型。这似乎是底层运算符的工作方式,其中许多扩展AbstractObservableWithUpstream
。虽然这里发生了很多事情,但似乎很容易错过某些事情或做一些我不应该做的事情。我不确定我是否应该采用这样的方法。我逐步完成了心理过程,它似乎很快就会变得毛茸茸。
我将继续使用选项 #2,但我认为值得询问 RxJava2 中支持的方法是什么,并找出是否有任何文档或示例。
不建议初学者编写运算符,许多所需的流模式可以通过现有运算符实现。
你看过 RxJava 关于 writing operators for 2.x 的 wiki 吗?我建议从上到下阅读。
- 使用
create()
是可能的,但大多数人使用它来发出带有 for-each 循环的List
的元素,而没有意识到Flowable.fromIterable
会这样做。 - 尽管 RxJava 2 运算符本身不使用
lift()
,但我们保留了这个扩展点。如果你想避免使用选项 3 的一些样板文件。那么你可以尝试 this route. - 这就是 RxJava 2 运算符的实现方式。
AbstractObservableWithUpstream
是一个小便利,对于 external implementors 不是必需的。
这可能对您有所帮助。我实现了 operator RxJava2 来处理 APiError。我用了电梯操作员。
参见示例。
public final class ApiClient implements ApiClientInterface {
...
@NonNull
@Override
public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
return myApiService.activate(email, emailData)
.lift(getApiErrorTransformer())
.subscribeOn(Schedulers.io());
}
private <T>ApiErrorOperator<T> getApiErrorTransformer() {
return new ApiErrorOperator<>(gson, networkService);
}
}
然后你可以找到自定义运算符
public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
private static final String TAG = "ApiErrorOperator";
private final Gson gson;
private final NetworkService networkService;
public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
this.gson = gson;
this.networkService = networkService;
}
@Override
public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(T value) {
observer.onNext(value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError", e);
if (e instanceof HttpException) {
try {
HttpException error = (HttpException) e;
Response response = error.response();
String errorBody = response.errorBody().string();
ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
ApiException exception = new ApiException(errorResponse, response);
observer.onError(exception);
} catch (IOException exception) {
observer.onError(exception);
}
} else if (!networkService.isNetworkAvailable()) {
observer.onError(new NetworkException(ErrorResponse.builder()
.setErrorCode("")
.setDescription("No Network Connection Error")
.build()));
} else {
observer.onError(e);
}
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}