如何处理来自 WebClient 的异步响应?

How to process async response from WebClient?

我正在尝试使用 WebClient 发出大量 Http POST 请求。 我将日志记录级别设置为 reactor.ipc.netty 以调试以查看正在发送的请求。

这是一个有效的代码:

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://blablabla.com/bla";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ClientResponse> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange();
        ClientResponse resp = res.block();
        logger.debug("Status : " + resp.statusCode());
        logger.debug("Body : " + resp.bodyToMono(String.class));
    }
}

它产生那种日志:

2018-05-16 15:54:14.642 DEBUG 19144 --- [ctor-http-nio-4] r.i.n.channel.ChannelOperationsHandler   : [id: 0x439f7819, L:/127.0.0.1:56556 - R:blablabla.com/127.0.0.1:8069] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
POST /bla HTTP/1.1
user-agent: ReactorNetty/0.7.7.RELEASE
host: blablabla.com/bla:8069
accept: */*
accept-encoding: gzip
Content-Length: 494
Content-Type: application/json

但是当我删除 ClientResponse resp = res.block(); 我再也看不到日志了...所以我什至不知道请求是否被处理。 收到回复后如何处理? 我试过 res.doOnSuccess(clientResponse -> logger.debug("Code : " + clientResponse.statusCode())); 但没有成功...

因为 FluxMono 是 Reactive Streams 类型,它们也是惰性的:在你 subscribe 之前什么都不会发生。有几种方法可以实现该目标,subscribeblock 就是其中之一。

通常,Spring WebFlux 应用程序不会 subscribe 直接 return 那些反应类型;如果您的情况没有更多背景信息,我真的不能说这里的正确方法是什么。

查看 Reactor project reference documentation on Flux and Mono,这应该可以帮助您了解其背后的核心原则。