Mono 与 CompletableFuture
Mono vs CompletableFuture
CompletableFuture
在单独的线程上执行任务(使用线程池)并提供回调函数。假设我在 CompletableFuture
中有一个 API 调用。那是 API 呼叫阻塞吗?线程是否会被阻塞,直到它没有得到 API 的响应? (我知道主 thread/tomcat 线程将是非阻塞的,但是执行 CompletableFuture 任务的线程呢?)
据我所知,Mono 是完全非阻塞的。
请说明这一点,如果我错了请纠正我。
CompletableFuture 是异步的。但它是非阻塞的吗?
关于 CompletableFuture 的一个真实情况是它是真正的异步,它允许您从调用者线程异步地 运行 您的任务,并且 API 例如 thenXXX
允许您在结果可用时处理结果。另一方面,CompletableFuture
并不总是非阻塞的。例如,当你运行下面的代码时,它会在默认的ForkJoinPool
上异步执行:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
很明显ForkJoinPool
中的Thread
执行任务最终会阻塞,也就是说我们不能保证调用一定是非阻塞的
另一方面,CompletableFuture
公开了 API,这使您可以使其真正成为非阻塞的。
例如,您始终可以执行以下操作:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
如您所见,CompletableFuture
future 的 API 为您提供了 complete
和 completeExceptionally
方法,可以在需要时完成执行,而不会阻塞任何线程。
Mono 与 CompletableFuture
在上一节中,我们对 CF 行为进行了概述,但 CompletableFuture 和 Mono 的主要区别是什么?
值得一提的是,我们也可以阻塞 Mono。没有人阻止我们写以下内容:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
当然,一旦我们订阅了未来,调用线程就会被阻塞。但是我们总是可以通过提供额外的 subscribeOn
运算符来解决这个问题。然而,Mono
中更广泛的 API 并不是关键特征。
为了理解CompletableFuture
和Mono
之间的主要区别,让我们回到前面提到的myNonBlockingHttpCall
方法实现。
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
在CompletableFuture
的情况下,一旦调用该方法,它就会急切地执行对另一个service/resource的HTTP调用。即使我们在验证某些 pre/post 条件后并不真正需要执行结果,它也会开始执行,并会为此工作分配额外的 CPU/DB-Connections/What-Ever-Machine-Resources。
相比之下,Mono
类型根据定义是惰性的:
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
在这种情况下,在订阅最后一个 mono
之前不会发生任何事情。因此,只有当 myNonBlockingHttpCallWithMono
方法返回 Mono
时,才会被订阅,提供给 Mono.create(Consumer)
的逻辑才会被执行。
而且我们可以走得更远。我们可以让我们的执行更加懒惰。您可能知道,Mono
扩展了 Reactive Streams 规范的 Publisher
。 Reactive Streams 的尖叫特性是背压支持。因此,使用 Mono
API 我们可以仅在真正需要数据并且我们的订阅者准备好使用它们时才执行:
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
在此示例中,我们仅在订阅者调用 Subscription#request
时才执行数据,因此通过这样做它声明其已准备好接收数据。
总结
CompletableFuture
是异步的,可以是非阻塞的
CompletableFuture
急切。你不能推迟执行。但是你可以取消它们(总比没有好)
Mono
是 async/non-blocking 并且可以通过将主 Mono
与不同的运算符组合起来轻松地对不同的 Thread
执行任何调用。
Mono
是真正的懒惰,允许通过订阅者的存在和它是否准备好使用数据来推迟执行启动。
在 Oleh 的回答的基础上,CompletableFuture
的一个可能的惰性解决方案是
public CompletableFuture myNonBlockingHttpCall(CompletableFuture<ExecutorService> dispatch, Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
dispatch.thenAccept(x -> x.submit(() -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
}));
return uncompletedFuture;
}
然后,稍后你只需做
dispatch.complete(executor);
这将使 CompletableFuture
等同于 Mono
,但我猜没有背压。
CompletableFuture
在单独的线程上执行任务(使用线程池)并提供回调函数。假设我在 CompletableFuture
中有一个 API 调用。那是 API 呼叫阻塞吗?线程是否会被阻塞,直到它没有得到 API 的响应? (我知道主 thread/tomcat 线程将是非阻塞的,但是执行 CompletableFuture 任务的线程呢?)
据我所知,Mono 是完全非阻塞的。
请说明这一点,如果我错了请纠正我。
CompletableFuture 是异步的。但它是非阻塞的吗?
关于 CompletableFuture 的一个真实情况是它是真正的异步,它允许您从调用者线程异步地 运行 您的任务,并且 API 例如 thenXXX
允许您在结果可用时处理结果。另一方面,CompletableFuture
并不总是非阻塞的。例如,当你运行下面的代码时,它会在默认的ForkJoinPool
上异步执行:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
很明显ForkJoinPool
中的Thread
执行任务最终会阻塞,也就是说我们不能保证调用一定是非阻塞的
另一方面,CompletableFuture
公开了 API,这使您可以使其真正成为非阻塞的。
例如,您始终可以执行以下操作:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
如您所见,CompletableFuture
future 的 API 为您提供了 complete
和 completeExceptionally
方法,可以在需要时完成执行,而不会阻塞任何线程。
Mono 与 CompletableFuture
在上一节中,我们对 CF 行为进行了概述,但 CompletableFuture 和 Mono 的主要区别是什么?
值得一提的是,我们也可以阻塞 Mono。没有人阻止我们写以下内容:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
当然,一旦我们订阅了未来,调用线程就会被阻塞。但是我们总是可以通过提供额外的 subscribeOn
运算符来解决这个问题。然而,Mono
中更广泛的 API 并不是关键特征。
为了理解CompletableFuture
和Mono
之间的主要区别,让我们回到前面提到的myNonBlockingHttpCall
方法实现。
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
在CompletableFuture
的情况下,一旦调用该方法,它就会急切地执行对另一个service/resource的HTTP调用。即使我们在验证某些 pre/post 条件后并不真正需要执行结果,它也会开始执行,并会为此工作分配额外的 CPU/DB-Connections/What-Ever-Machine-Resources。
相比之下,Mono
类型根据定义是惰性的:
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
在这种情况下,在订阅最后一个 mono
之前不会发生任何事情。因此,只有当 myNonBlockingHttpCallWithMono
方法返回 Mono
时,才会被订阅,提供给 Mono.create(Consumer)
的逻辑才会被执行。
而且我们可以走得更远。我们可以让我们的执行更加懒惰。您可能知道,Mono
扩展了 Reactive Streams 规范的 Publisher
。 Reactive Streams 的尖叫特性是背压支持。因此,使用 Mono
API 我们可以仅在真正需要数据并且我们的订阅者准备好使用它们时才执行:
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
在此示例中,我们仅在订阅者调用 Subscription#request
时才执行数据,因此通过这样做它声明其已准备好接收数据。
总结
CompletableFuture
是异步的,可以是非阻塞的CompletableFuture
急切。你不能推迟执行。但是你可以取消它们(总比没有好)Mono
是 async/non-blocking 并且可以通过将主Mono
与不同的运算符组合起来轻松地对不同的Thread
执行任何调用。Mono
是真正的懒惰,允许通过订阅者的存在和它是否准备好使用数据来推迟执行启动。
在 Oleh 的回答的基础上,CompletableFuture
的一个可能的惰性解决方案是
public CompletableFuture myNonBlockingHttpCall(CompletableFuture<ExecutorService> dispatch, Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
dispatch.thenAccept(x -> x.submit(() -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
}));
return uncompletedFuture;
}
然后,稍后你只需做
dispatch.complete(executor);
这将使 CompletableFuture
等同于 Mono
,但我猜没有背压。