具有 CompletableFuture 的 Vertx HTTPClient 阻止回调线程
Vertx HTTPClient with CompletableFuture block the callback Thread
我遇到了一个很奇怪的问题。
我正在处理 Vert.x,我正在使用 Vert.x 的 HttpClientRequest
从处理程序调用 REST APIs。现在我有一个 CompletableFuture
,我正在 HttpClientRequest
的响应处理程序中完成它。后来我用的是CompletableFuture.get()
。但是每当 get()
方法被调用时,主线程就会被阻塞(正如预期的那样),但它会永远被阻塞。我没有看到回调发生在我的响应处理程序上,它永远被卡住了。
代码如下:
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.http.HttpClient;
CompletableFuture<JsonObject> comp = new CompletableFuture<JsonObject>();
HttpClient httpClient = new HttpClient(); //This object initialized and set the endpoit, port and domain name.
HttpClientRequest request = httpClient.request(HttpMethod.POST, requestURI, response -> {
response.bodyHandler(body -> {
//do some process
comp.complete(new JsonObject(body);
});
}).exceptionHandler(e -> {
//log the error
comp.completeExceptionally(e);
});
request.end();
//after some process
comp.get(); // here main thread is stuck forever.
我的 API 给出了 200 个响应,我在其中看到了 Wireshark 并且如果我这样做 comp.thenAccept()
则执行回调并给出我的结果。
为什么会这样,解决方法是什么?
注意:我知道不建议使用 Completable.get()
方法,但在我的用例中,我必须使用它。
这是给我问题的示例代码:
package io.vertx.starter;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.http.*;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import java.util.concurrent.CompletableFuture;
public class SampleVerticle extends AbstractVerticle {
public void start ( Future startFuture ) throws Exception {
Future<Void> future = Future.future ();
HttpServer server = vertx.createHttpServer ();
Router router = Router.router (vertx);
router.get ("/sample").handler (this::sampeHandler);
router.get ("/testcompletableblocking").handler (this::testCompBlocking);
router.get ("/testcompletablenonblocking").handler (this::testCompNonBlocking);
server.requestHandler (router::accept) // <5>
.listen (8080, ar -> { // <6>
if (ar.succeeded ()) {
System.out.println ("Server started");
future.complete ();
} else {
System.out.println ("Server is not started");
future.fail (ar.cause ());
}
});
}
private void sampeHandler ( RoutingContext context ) {
try {
Thread.sleep (1000);
} catch (Exception e) {
}
String response = "Hello...";
context.response ().setStatusCode (200).putHeader ("content-type", "text/html").end (response);
}
private void testCompBlocking ( RoutingContext context ) {
System.out.println ("Calling testCompBlocking....");
HttpClientOptions clientOptions = new HttpClientOptions ().setDefaultHost ("localhost").setDefaultPort (8080).setSsl (false).setKeepAlive (true);
HttpClient client = vertx.createHttpClient (clientOptions);
String requestURI = "/sample";
CompletableFuture<String> comp = new CompletableFuture<> ();
HttpClientRequest request = client.request (HttpMethod.GET, requestURI, response -> {
response.bodyHandler (body -> {
String kmsResponse = new String (body.getBytes ());
System.out.println ("kmsResponse-" + kmsResponse);
comp.complete (kmsResponse);
});
}).exceptionHandler (e -> {
e.printStackTrace ();
comp.completeExceptionally (e);
});
request.end ();
String result = "Not Success";
try {
result = comp.get ();
} catch (Exception e) {
System.out.println ("Exception in getting from Completable..." + e.getMessage ());
e.printStackTrace ();
}
context.response ().setStatusCode (200);
context.response ().putHeader ("content-type", "text/html");
context.response ().end (result);
System.out.println ("end testCompBlocking....");
}
private void testCompNonBlocking ( RoutingContext context ) {
System.out.println ("Calling testCompNonBlocking....");
HttpClientOptions clientOptions = new HttpClientOptions ().setDefaultHost ("localhost").setDefaultPort (8080).setKeepAlive (false);
HttpClient client = vertx.createHttpClient (clientOptions);
String requestURI = "/sample";
CompletableFuture<String> comp = new CompletableFuture<> ();
HttpClientRequest request = client.request (HttpMethod.GET, requestURI, response -> {
response.bodyHandler (body -> {
String kmsResponse = new String (body.getBytes ());
System.out.println ("kmsResponse-" + kmsResponse);
comp.complete (kmsResponse);
});
}).exceptionHandler (e -> {
e.printStackTrace ();
comp.completeExceptionally (e);
});
request.end ();
String result = "Not Blocking, please see result at Console";
try {
comp.thenAccept (apiResult -> System.out.println ("apiResult from CompletableFuture - " + apiResult));
} catch (Exception e) {
System.out.println ("Exception in getting from Completable..." + e.getMessage ());
e.printStackTrace ();
}
context.response ().setStatusCode (200);
context.response ().putHeader ("content-type", "text/html");
context.response ().end (result);
System.out.println ("end testCompNonBlocking....");
}
}
调用localhost:8080/testcompletableblocking
,没有发送响应,当前线程被永远阻塞。
get()
阻塞主线程直到 future 完成,但是,HttpClientRequest 是在主线程上执行的,因此这种情况会导致死锁。
相反,thenAccept()
是非阻塞的,仅创建一个回调,在 future 完成时执行。
根据您提供的代码,您的用例不明确;您分别使用 HttpClient
和 CompletableFuture
而不是 WebClient
and Future
是有原因的吗?
如果您需要 使用 CompletableFuture,那么您应该查看 this 项目以获得更 Vert.x 兼容的实现。
您的实施的问题在于它违反了 The Golden Rule - Don’t Block the Event Loop。你不应该在事件循环中调用像 CompletableFuture.get()
这样的阻塞操作。同样,sampleHandler()
也不应该在事件循环中调用 Thread.sleep()
,但这是一个较小的问题。
结果是你的事件循环现在被阻塞了……所以你的 /sample
请求不能再被处理。由于请求未被处理,您 CompletableFuture
仍未完成……死锁。
这个问题有两种可能的解决方案:
按设计使用 CompletableFuture
,依赖链式调用而不是 get()
,尽管它不强制执行 Vert.x 的线程模型。例如:
comp.whenComplete((result, e) -> {
System.out.println("Got sample response");
if (e != null) {
context.response().setStatusCode(500)
.end(e.getMessage());
} else {
context.response().setStatusCode(200)
.putHeader("content-type", "text/html")
.end(result);
}
System.out.println("end testCompBlocking....");
});
Use Vert.x facilities 用于 运行 阻塞代码。 CompletableFuture
不需要,但其他 API 可能需要。例如:
context.vertx().<String>executeBlocking(future -> {
String result = "Not Success";
try {
result = comp.get();
} catch (Exception e) {
System.out.println("Exception in getting from Completable..." + e.getMessage());
e.printStackTrace();
}
future.complete(result);
},
false,
result -> {
context.response().setStatusCode(200);
context.response().putHeader("content-type", "text/html");
context.response().end(result.result());
System.out.println("end testCompBlocking....");
});
我遇到了一个很奇怪的问题。
我正在处理 Vert.x,我正在使用 Vert.x 的 HttpClientRequest
从处理程序调用 REST APIs。现在我有一个 CompletableFuture
,我正在 HttpClientRequest
的响应处理程序中完成它。后来我用的是CompletableFuture.get()
。但是每当 get()
方法被调用时,主线程就会被阻塞(正如预期的那样),但它会永远被阻塞。我没有看到回调发生在我的响应处理程序上,它永远被卡住了。
代码如下:
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.http.HttpClient;
CompletableFuture<JsonObject> comp = new CompletableFuture<JsonObject>();
HttpClient httpClient = new HttpClient(); //This object initialized and set the endpoit, port and domain name.
HttpClientRequest request = httpClient.request(HttpMethod.POST, requestURI, response -> {
response.bodyHandler(body -> {
//do some process
comp.complete(new JsonObject(body);
});
}).exceptionHandler(e -> {
//log the error
comp.completeExceptionally(e);
});
request.end();
//after some process
comp.get(); // here main thread is stuck forever.
我的 API 给出了 200 个响应,我在其中看到了 Wireshark 并且如果我这样做 comp.thenAccept()
则执行回调并给出我的结果。
为什么会这样,解决方法是什么?
注意:我知道不建议使用 Completable.get()
方法,但在我的用例中,我必须使用它。
这是给我问题的示例代码:
package io.vertx.starter;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.http.*;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import java.util.concurrent.CompletableFuture;
public class SampleVerticle extends AbstractVerticle {
public void start ( Future startFuture ) throws Exception {
Future<Void> future = Future.future ();
HttpServer server = vertx.createHttpServer ();
Router router = Router.router (vertx);
router.get ("/sample").handler (this::sampeHandler);
router.get ("/testcompletableblocking").handler (this::testCompBlocking);
router.get ("/testcompletablenonblocking").handler (this::testCompNonBlocking);
server.requestHandler (router::accept) // <5>
.listen (8080, ar -> { // <6>
if (ar.succeeded ()) {
System.out.println ("Server started");
future.complete ();
} else {
System.out.println ("Server is not started");
future.fail (ar.cause ());
}
});
}
private void sampeHandler ( RoutingContext context ) {
try {
Thread.sleep (1000);
} catch (Exception e) {
}
String response = "Hello...";
context.response ().setStatusCode (200).putHeader ("content-type", "text/html").end (response);
}
private void testCompBlocking ( RoutingContext context ) {
System.out.println ("Calling testCompBlocking....");
HttpClientOptions clientOptions = new HttpClientOptions ().setDefaultHost ("localhost").setDefaultPort (8080).setSsl (false).setKeepAlive (true);
HttpClient client = vertx.createHttpClient (clientOptions);
String requestURI = "/sample";
CompletableFuture<String> comp = new CompletableFuture<> ();
HttpClientRequest request = client.request (HttpMethod.GET, requestURI, response -> {
response.bodyHandler (body -> {
String kmsResponse = new String (body.getBytes ());
System.out.println ("kmsResponse-" + kmsResponse);
comp.complete (kmsResponse);
});
}).exceptionHandler (e -> {
e.printStackTrace ();
comp.completeExceptionally (e);
});
request.end ();
String result = "Not Success";
try {
result = comp.get ();
} catch (Exception e) {
System.out.println ("Exception in getting from Completable..." + e.getMessage ());
e.printStackTrace ();
}
context.response ().setStatusCode (200);
context.response ().putHeader ("content-type", "text/html");
context.response ().end (result);
System.out.println ("end testCompBlocking....");
}
private void testCompNonBlocking ( RoutingContext context ) {
System.out.println ("Calling testCompNonBlocking....");
HttpClientOptions clientOptions = new HttpClientOptions ().setDefaultHost ("localhost").setDefaultPort (8080).setKeepAlive (false);
HttpClient client = vertx.createHttpClient (clientOptions);
String requestURI = "/sample";
CompletableFuture<String> comp = new CompletableFuture<> ();
HttpClientRequest request = client.request (HttpMethod.GET, requestURI, response -> {
response.bodyHandler (body -> {
String kmsResponse = new String (body.getBytes ());
System.out.println ("kmsResponse-" + kmsResponse);
comp.complete (kmsResponse);
});
}).exceptionHandler (e -> {
e.printStackTrace ();
comp.completeExceptionally (e);
});
request.end ();
String result = "Not Blocking, please see result at Console";
try {
comp.thenAccept (apiResult -> System.out.println ("apiResult from CompletableFuture - " + apiResult));
} catch (Exception e) {
System.out.println ("Exception in getting from Completable..." + e.getMessage ());
e.printStackTrace ();
}
context.response ().setStatusCode (200);
context.response ().putHeader ("content-type", "text/html");
context.response ().end (result);
System.out.println ("end testCompNonBlocking....");
}
}
调用localhost:8080/testcompletableblocking
,没有发送响应,当前线程被永远阻塞。
get()
阻塞主线程直到 future 完成,但是,HttpClientRequest 是在主线程上执行的,因此这种情况会导致死锁。
相反,thenAccept()
是非阻塞的,仅创建一个回调,在 future 完成时执行。
根据您提供的代码,您的用例不明确;您分别使用 HttpClient
和 CompletableFuture
而不是 WebClient
and Future
是有原因的吗?
如果您需要 使用 CompletableFuture,那么您应该查看 this 项目以获得更 Vert.x 兼容的实现。
您的实施的问题在于它违反了 The Golden Rule - Don’t Block the Event Loop。你不应该在事件循环中调用像 CompletableFuture.get()
这样的阻塞操作。同样,sampleHandler()
也不应该在事件循环中调用 Thread.sleep()
,但这是一个较小的问题。
结果是你的事件循环现在被阻塞了……所以你的 /sample
请求不能再被处理。由于请求未被处理,您 CompletableFuture
仍未完成……死锁。
这个问题有两种可能的解决方案:
按设计使用
CompletableFuture
,依赖链式调用而不是get()
,尽管它不强制执行 Vert.x 的线程模型。例如:comp.whenComplete((result, e) -> { System.out.println("Got sample response"); if (e != null) { context.response().setStatusCode(500) .end(e.getMessage()); } else { context.response().setStatusCode(200) .putHeader("content-type", "text/html") .end(result); } System.out.println("end testCompBlocking...."); });
Use Vert.x facilities 用于 运行 阻塞代码。
CompletableFuture
不需要,但其他 API 可能需要。例如:context.vertx().<String>executeBlocking(future -> { String result = "Not Success"; try { result = comp.get(); } catch (Exception e) { System.out.println("Exception in getting from Completable..." + e.getMessage()); e.printStackTrace(); } future.complete(result); }, false, result -> { context.response().setStatusCode(200); context.response().putHeader("content-type", "text/html"); context.response().end(result.result()); System.out.println("end testCompBlocking...."); });