如何在 playframework 中将 Observable 转换为 Promise java api
How to convert Observable to Promise in playframework java api
我的 promise 默认实现如下:
public F.Promise<Result> index() {
return F.Promise.promise(() -> intensiveComputationSync())
.map(result -> ok(String.valueOf(result)));
}
但现在我有 rx api:
private Observable<Integer> intensiveComputationObservable() {
return Observable.create(subscriber -> {
try {
subscriber.onNext(intensiveComputationSync());
} catch (Exception e) {
subscriber.onError(e);
}
subscriber.onCompleted();
});
}
我如何使用我的 rx api 和 promises?对正确的错误处理特别感兴趣。
看来我找到了解决办法。
用 promise 包装 observable 的函数:
public static <T> F.Promise<T> observableToPromise(Observable<T> obs) {
scala.concurrent.Promise<T> scalaPromise = scala.concurrent.Promise$.MODULE$.<T>apply();
obs.subscribe(
result -> scalaPromise.success(result),
throwable -> scalaPromise.failure(throwable)
);
return F.Promise.wrap(scalaPromise.future());
}
现在我的行动:
public F.Promise<Result> index() {
//convert observable to promise then map computation result to F.Promise<Result>
F.Promise<Result> succeedPromise =
observableToPromise(
//computation on another thread
intensiveComputationObservable().subscribeOn(Schedulers.newThread())
).map(result -> ok(String.valueOf(result)));
//if exception while computation thrown, we returns another status
return succeedPromise
.recoverWith(throwable -> F.Promise.pure(internalServerError("error")));
}
以下是在 Play 2.5 中执行此操作的方法:
public CompletableFuture<Result> callRPC() {
Observable<Object> result = <create observable>
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
result.subscribe((active) -> completableFuture.complete(active),
(err) -> completableFuture.completeExceptionally(err));
return completableFuture.thenApply( i -> ok(Json.toJson(i)));
}
我的 promise 默认实现如下:
public F.Promise<Result> index() {
return F.Promise.promise(() -> intensiveComputationSync())
.map(result -> ok(String.valueOf(result)));
}
但现在我有 rx api:
private Observable<Integer> intensiveComputationObservable() {
return Observable.create(subscriber -> {
try {
subscriber.onNext(intensiveComputationSync());
} catch (Exception e) {
subscriber.onError(e);
}
subscriber.onCompleted();
});
}
我如何使用我的 rx api 和 promises?对正确的错误处理特别感兴趣。
看来我找到了解决办法。
用 promise 包装 observable 的函数:
public static <T> F.Promise<T> observableToPromise(Observable<T> obs) {
scala.concurrent.Promise<T> scalaPromise = scala.concurrent.Promise$.MODULE$.<T>apply();
obs.subscribe(
result -> scalaPromise.success(result),
throwable -> scalaPromise.failure(throwable)
);
return F.Promise.wrap(scalaPromise.future());
}
现在我的行动:
public F.Promise<Result> index() {
//convert observable to promise then map computation result to F.Promise<Result>
F.Promise<Result> succeedPromise =
observableToPromise(
//computation on another thread
intensiveComputationObservable().subscribeOn(Schedulers.newThread())
).map(result -> ok(String.valueOf(result)));
//if exception while computation thrown, we returns another status
return succeedPromise
.recoverWith(throwable -> F.Promise.pure(internalServerError("error")));
}
以下是在 Play 2.5 中执行此操作的方法:
public CompletableFuture<Result> callRPC() {
Observable<Object> result = <create observable>
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
result.subscribe((active) -> completableFuture.complete(active),
(err) -> completableFuture.completeExceptionally(err));
return completableFuture.thenApply( i -> ok(Json.toJson(i)));
}