如果处理了内部路由中的异常 (Apache Camel),则不会执行具有聚合策略的 Splitter 之后的代码
Code after Splitter with aggregation strategy is not executed if exception in inner route were handled (Apache Camel)
我遇到过我无法理解的行为。当执行 Split with AggregationStrategy 并且在其中一次迭代期间发生异常时,会发生此问题。在另一个路由中的 Splitter 内部发生异常(每次迭代调用的直接端点)。似乎路由执行在 Splitter 之后停止。
这是示例代码。
这是一个为每个客户建立一个报告并收集文件名称用于内部统计的路线。
@Component
@RequiredArgsConstructor
@FieldDefaults(level = PRIVATE, makeFinal = true)
public class ReportRouteBuilder extends RouteBuilder {
ClientRepository clientRepository;
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("direct:handleError")); //handles an error, adds error message to internal error collector for statistic and writes log
from("direct:generateReports")
.setProperty("reportTask", body()) //at this point there is in the body an object of type ReportTask, containig all data required for building report
.bean(clientRepository, "getAllClients") // Body is a List<Client>
.split(body())
.aggregationStrategy(new FileNamesListAggregationStrategy())
.to("direct:generateReportForClient") // creates report which is saved in the file system. uses the same error handler
.end()
//when an exception occurs during split then code after splitter is not executed
.log("Finished generating reports. Files created ${body}"); // Body has to be List<String> with file names.
}
}
AggregationStrategy 非常简单 - 它只是提取文件的名称。如果 header 不存在,它 returns NULL.
public class FileNamesListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
Message inMessage = exchange.getIn();
return inMessage.getHeader(Exchange.FILE_NAME, String.class);
}
}
当拆分后一切顺利时,Body 列表中会出现所有文件名。但是当在路由“direct:generateReportForClient”中发生了一些异常(我为一个客户端添加了错误模拟)比聚合 body 只包含少一个文件名 - 没关系(一切都正确聚合)。
但是在 Split 之后路由执行停止并且此时 body 中的结果(带有文件名的列表)被返回到客户端(FluentProducer),它期望 ReportTask 作为响应 body.
并尝试将值 - 列表(聚合结果)转换为 ReportTask 并导致 org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type
为什么拆分后路由中断?已处理所有错误并正确完成聚合。
PS 我读过 Camel In Action 一书和关于 Splitter 的文档,但我还没有找到答案。
PPS 项目在 Spring Boot 2.3.1 和 Camel 3.3.0
上运行
更新
此路由由 FluentProducerTemplate
启动
ReportTask processedReportTask = producer.to("direct:generateReports")
.withBody(reportTask)
.request(ReportTask.class);
问题是拆分中的错误处理程序 + 自定义聚合策略。
来自 Camel in Action 书 (5.3.5):
WARNING When using a custom AggregationStrategy with the Splitter,
it’s important to know that you’re responsible for handling
exceptions. If you don’t propagate the exception back, the Splitter
will assume you’ve handled the exception and will ignore it.
在您的代码中,您使用了从 AbstractListAggregationStrategy
扩展而来的聚合策略。让我们看看 AbstractListAggregationStrategy
:
中的 aggregate
方法
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<V> list;
if (oldExchange == null) {
list = getList(newExchange);
} else {
list = getList(oldExchange);
}
if (newExchange != null) {
V value = getValue(newExchange);
if (value != null) {
list.add(value);
}
}
return oldExchange != null ? oldExchange : newExchange;
}
如果第一次交换是由错误处理程序处理的,我们将在结果交换 (newExchange
) 中拥有由错误处理程序 (Exchange.EXCEPTION_CAUGHT, Exchange.FAILURE_ENDPOINT, Exchange.ERRORHANDLER_HANDLED and Exchange.FAILURE_HANDLED
) 和 exchange.errorHandlerHandled=true
设置的属性数。 getErrorHandlerHandled()/setErrorHandlerHandled(Boolean errorHandlerHandled)
方法在 ExtendedExchange
界面中可用。
在这种情况下,您的拆分以与 errorHandlerHandled=true
的交换结束并中断了路线。
原因在camel exception clause manual
中有描述
If handled is true, then the thrown exception will be handled and
Camel will not continue routing in the original route, but break out.
要防止这种行为,您可以将交换转换为 ExtendedExchange
并在聚合策略 aggregate
方法中设置 errorHandlerHandled=false
。而你的路线不会断,还会继续。
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange aggregatedExchange = super.aggregate(oldExchange, newExchange);
((ExtendedExchange) aggregatedExchange).setErrorHandlerHandled(false);
return aggregatedExchange;
}
棘手的情况是,如果您的交换由错误处理程序处理为而不是聚合策略中的第一个,您将不会遇到任何问题。因为 camel 将使用 first 交换(没有 errorHandlerHandled=true
)作为聚合的基础。
我遇到过我无法理解的行为。当执行 Split with AggregationStrategy 并且在其中一次迭代期间发生异常时,会发生此问题。在另一个路由中的 Splitter 内部发生异常(每次迭代调用的直接端点)。似乎路由执行在 Splitter 之后停止。
这是示例代码。
这是一个为每个客户建立一个报告并收集文件名称用于内部统计的路线。
@Component
@RequiredArgsConstructor
@FieldDefaults(level = PRIVATE, makeFinal = true)
public class ReportRouteBuilder extends RouteBuilder {
ClientRepository clientRepository;
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("direct:handleError")); //handles an error, adds error message to internal error collector for statistic and writes log
from("direct:generateReports")
.setProperty("reportTask", body()) //at this point there is in the body an object of type ReportTask, containig all data required for building report
.bean(clientRepository, "getAllClients") // Body is a List<Client>
.split(body())
.aggregationStrategy(new FileNamesListAggregationStrategy())
.to("direct:generateReportForClient") // creates report which is saved in the file system. uses the same error handler
.end()
//when an exception occurs during split then code after splitter is not executed
.log("Finished generating reports. Files created ${body}"); // Body has to be List<String> with file names.
}
}
AggregationStrategy 非常简单 - 它只是提取文件的名称。如果 header 不存在,它 returns NULL.
public class FileNamesListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
Message inMessage = exchange.getIn();
return inMessage.getHeader(Exchange.FILE_NAME, String.class);
}
}
当拆分后一切顺利时,Body 列表中会出现所有文件名。但是当在路由“direct:generateReportForClient”中发生了一些异常(我为一个客户端添加了错误模拟)比聚合 body 只包含少一个文件名 - 没关系(一切都正确聚合)。
但是在 Split 之后路由执行停止并且此时 body 中的结果(带有文件名的列表)被返回到客户端(FluentProducer),它期望 ReportTask 作为响应 body.
并尝试将值 - 列表(聚合结果)转换为 ReportTask 并导致 org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type
为什么拆分后路由中断?已处理所有错误并正确完成聚合。
PS 我读过 Camel In Action 一书和关于 Splitter 的文档,但我还没有找到答案。
PPS 项目在 Spring Boot 2.3.1 和 Camel 3.3.0
上运行更新 此路由由 FluentProducerTemplate
启动 ReportTask processedReportTask = producer.to("direct:generateReports")
.withBody(reportTask)
.request(ReportTask.class);
问题是拆分中的错误处理程序 + 自定义聚合策略。
来自 Camel in Action 书 (5.3.5):
WARNING When using a custom AggregationStrategy with the Splitter, it’s important to know that you’re responsible for handling exceptions. If you don’t propagate the exception back, the Splitter will assume you’ve handled the exception and will ignore it.
在您的代码中,您使用了从 AbstractListAggregationStrategy
扩展而来的聚合策略。让我们看看 AbstractListAggregationStrategy
:
aggregate
方法
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<V> list;
if (oldExchange == null) {
list = getList(newExchange);
} else {
list = getList(oldExchange);
}
if (newExchange != null) {
V value = getValue(newExchange);
if (value != null) {
list.add(value);
}
}
return oldExchange != null ? oldExchange : newExchange;
}
如果第一次交换是由错误处理程序处理的,我们将在结果交换 (newExchange
) 中拥有由错误处理程序 (Exchange.EXCEPTION_CAUGHT, Exchange.FAILURE_ENDPOINT, Exchange.ERRORHANDLER_HANDLED and Exchange.FAILURE_HANDLED
) 和 exchange.errorHandlerHandled=true
设置的属性数。 getErrorHandlerHandled()/setErrorHandlerHandled(Boolean errorHandlerHandled)
方法在 ExtendedExchange
界面中可用。
在这种情况下,您的拆分以与 errorHandlerHandled=true
的交换结束并中断了路线。
原因在camel exception clause manual
中有描述If handled is true, then the thrown exception will be handled and Camel will not continue routing in the original route, but break out.
要防止这种行为,您可以将交换转换为 ExtendedExchange
并在聚合策略 aggregate
方法中设置 errorHandlerHandled=false
。而你的路线不会断,还会继续。
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange aggregatedExchange = super.aggregate(oldExchange, newExchange);
((ExtendedExchange) aggregatedExchange).setErrorHandlerHandled(false);
return aggregatedExchange;
}
棘手的情况是,如果您的交换由错误处理程序处理为而不是聚合策略中的第一个,您将不会遇到任何问题。因为 camel 将使用 first 交换(没有 errorHandlerHandled=true
)作为聚合的基础。