在 Spring 集成中使用 WebFlux 出站网关进行错误处理

Error handling with WebFlux Outbound Gateway in Spring Integration

我正在尝试了解在 spring webflux 集成中应如何处理出站网关中的错误。

在 spring 集成 without webflux int-http:outbound-gatewayerror-handler 如下所示:

<int-http:outbound-gateway
        http-method="GET"
        url-expression="url"
        expected-response-type="java.lang.String"
        error-handler="accessErrorHandler"
        header-mapper="headerMapper"
        />

但在 spring 集成 with webflux int-webflux:outbound-gateway 没有错误处理程序

<int-webflux:outbound-gateway
                http-method="GET"
                url-expression="url"
                expected-response-type="java.lang.String"
                header-mapper="headerMapper"
        />

这是我对 pom.xml 的依赖:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-webflux</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

Spring 集成 HTTP 模块完全基于来自 Spring Web 的 RestTemplate。那个有提到的 ErrorHandler 用于它的 同步 请求。

Spring 集成 WebFlux 模块完全基于 Spring WebFlux 基础的 non-blocking WebClient。内部逻辑基于 Project Reactor 类型,例如 FluxMono。为了遵守反应流规范,WebFluxRequestExecutingMessageHandler 只是 returns 一个 Mono 用于响应。 如果在与服务器交互期间出现一些错误,我们会在此处提供:

requestSpec.exchange()
                    .flatMap(response -> {
                        HttpStatus httpStatus = response.statusCode();
                        if (httpStatus.isError()) {
                            return response.body(BodyExtractors.toDataBuffers())
                                    .reduce(DataBuffer::write)
                                    .map(dataBuffer -> {
                                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                                        dataBuffer.read(bytes);
                                        DataBufferUtils.release(dataBuffer);
                                        return bytes;
                                    })
                                    .defaultIfEmpty(new byte[0])
                                    .map(bodyBytes -> {
                                                throw new WebClientResponseException(
                                                        "ClientResponse has erroneous status code: "
                                                                + httpStatus.value() + " "
                                                                + httpStatus.getReasonPhrase(),
                                                        httpStatus.value(),
                                                        httpStatus.getReasonPhrase(),
                                                        response.headers().asHttpHeaders(),
                                                        bodyBytes,
                                                        response.headers().contentType()
                                                                .map(MimeType::getCharset)
                                                                .orElse(StandardCharsets.ISO_8859_1));
                                            }
                                    );
                        }
                        else {
                            return Mono.just(response);
                        }
                    });

所以,一些 WebClientResponseException 将被扔进回复 Mono。 在任何反应式或 non-reactive 下游,这样的异常将被处理如下:

protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
    Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
    Throwable result = ex;
    if (!(ex instanceof MessagingException)) {
        result = new MessageHandlingException(requestMessage, ex);
    }
    if (errorChannel == null) {
        logger.error("Async exception received and no 'errorChannel' header exists and no default "
                + "'errorChannel' found", result);
    }
    else {
        try {
            sendOutput(new ErrorMessage(result), errorChannel, true);
        }
        catch (Exception e) {
            Exception exceptionToLog =
                    IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
                            () -> "failed to send error message in the [" + this + ']', e);
            logger.error("Failed to send async reply", exceptionToLog);
        }
    }
}

其中 errorChannel 从请求消息的 headers 中提取并回退到全局 IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME

有了所有这些,您应该订阅这样一个错误通道来处理这些错误 WebClientResponseException 个实例。 在 Spring 框架文档中查看有关 RestTemplate 的更多信息:https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access