在 Spring 集成中使用 WebFlux 出站网关进行错误处理
Error handling with WebFlux Outbound Gateway in Spring Integration
我正在尝试了解在 spring webflux 集成中应如何处理出站网关中的错误。
在 spring 集成 without webflux int-http:outbound-gateway 有 error-handler 如下所示:
<int-http:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
error-handler="accessErrorHandler"
header-mapper="headerMapper"
/>
但在 spring 集成 with webflux int-webflux:outbound-gateway 没有错误处理程序
<int-webflux:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
header-mapper="headerMapper"
/>
这是我对 pom.xml 的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Spring 集成 HTTP 模块完全基于来自 Spring Web 的 RestTemplate
。那个有提到的 ErrorHandler
用于它的 同步 请求。
Spring 集成 WebFlux 模块完全基于 Spring WebFlux 基础的 non-blocking WebClient
。内部逻辑基于 Project Reactor 类型,例如 Flux
和 Mono
。为了遵守反应流规范,WebFluxRequestExecutingMessageHandler
只是 returns 一个 Mono
用于响应。
如果在与服务器交互期间出现一些错误,我们会在此处提供:
requestSpec.exchange()
.flatMap(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.isError()) {
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBuffer::write)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
throw new WebClientResponseException(
"ClientResponse has erroneous status code: "
+ httpStatus.value() + " "
+ httpStatus.getReasonPhrase(),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
);
}
else {
return Mono.just(response);
}
});
所以,一些 WebClientResponseException
将被扔进回复 Mono
。
在任何反应式或 non-reactive 下游,这样的异常将被处理如下:
protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
Throwable result = ex;
if (!(ex instanceof MessagingException)) {
result = new MessageHandlingException(requestMessage, ex);
}
if (errorChannel == null) {
logger.error("Async exception received and no 'errorChannel' header exists and no default "
+ "'errorChannel' found", result);
}
else {
try {
sendOutput(new ErrorMessage(result), errorChannel, true);
}
catch (Exception e) {
Exception exceptionToLog =
IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
() -> "failed to send error message in the [" + this + ']', e);
logger.error("Failed to send async reply", exceptionToLog);
}
}
}
其中 errorChannel
从请求消息的 headers 中提取并回退到全局 IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME
。
有了所有这些,您应该订阅这样一个错误通道来处理这些错误
WebClientResponseException
个实例。
在 Spring 框架文档中查看有关 RestTemplate
的更多信息:https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access
我正在尝试了解在 spring webflux 集成中应如何处理出站网关中的错误。
在 spring 集成 without webflux int-http:outbound-gateway 有 error-handler 如下所示:
<int-http:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
error-handler="accessErrorHandler"
header-mapper="headerMapper"
/>
但在 spring 集成 with webflux int-webflux:outbound-gateway 没有错误处理程序
<int-webflux:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
header-mapper="headerMapper"
/>
这是我对 pom.xml 的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Spring 集成 HTTP 模块完全基于来自 Spring Web 的 RestTemplate
。那个有提到的 ErrorHandler
用于它的 同步 请求。
Spring 集成 WebFlux 模块完全基于 Spring WebFlux 基础的 non-blocking WebClient
。内部逻辑基于 Project Reactor 类型,例如 Flux
和 Mono
。为了遵守反应流规范,WebFluxRequestExecutingMessageHandler
只是 returns 一个 Mono
用于响应。
如果在与服务器交互期间出现一些错误,我们会在此处提供:
requestSpec.exchange()
.flatMap(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.isError()) {
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBuffer::write)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
throw new WebClientResponseException(
"ClientResponse has erroneous status code: "
+ httpStatus.value() + " "
+ httpStatus.getReasonPhrase(),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
);
}
else {
return Mono.just(response);
}
});
所以,一些 WebClientResponseException
将被扔进回复 Mono
。
在任何反应式或 non-reactive 下游,这样的异常将被处理如下:
protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
Throwable result = ex;
if (!(ex instanceof MessagingException)) {
result = new MessageHandlingException(requestMessage, ex);
}
if (errorChannel == null) {
logger.error("Async exception received and no 'errorChannel' header exists and no default "
+ "'errorChannel' found", result);
}
else {
try {
sendOutput(new ErrorMessage(result), errorChannel, true);
}
catch (Exception e) {
Exception exceptionToLog =
IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
() -> "failed to send error message in the [" + this + ']', e);
logger.error("Failed to send async reply", exceptionToLog);
}
}
}
其中 errorChannel
从请求消息的 headers 中提取并回退到全局 IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME
。
有了所有这些,您应该订阅这样一个错误通道来处理这些错误
WebClientResponseException
个实例。
在 Spring 框架文档中查看有关 RestTemplate
的更多信息:https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access