Apache 骆驼聚合给出异常
Apache camel aggregate gives exception
关于问题:
提示我无法通过拆分方法获取字符串列表,所以我尝试了如下聚合方法:
public class lowestRates implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);
Pattern p = Pattern.compile("(\w+)\s(\d+)");
Matcher m1 = p.matcher(oldStr);
Matcher m2 = p.matcher(newStr);
String finalStr = "";
if(m1.group(2).equalsIgnoreCase(m2.group(2)))
finalStr = m1.group(1) + Integer.toString(Integer.parseInt(m1.group(2)) > Integer.parseInt(m2.group(2)) ? Integer.parseInt(m2.group(2)) : Integer.parseInt(m1.group(2)));
else
finalStr = oldStr + "\n" + newStr;
oldExchange.getIn().setBody(finalStr);
return oldExchange;
}
}
和新的主要代码:
from("file://files")
.split()
.tokenize("\n")
.aggregate(new lowestRates())
.body()
.completionTimeout(5000)
.to("file://files/result.txt")
但它给了我:
org.apache.camel.CamelExchangeException: Error occurred during aggregation. Exchange[][Message: Good1 450]. Caused by: [java.lang.NullPointerException - null]
现在的问题是如何编写正确的聚合方法,因为我看不出这里有什么错误:(。
String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);
您需要在此处检查字符串是否为 null..
对于第一条消息,oldStr 将为空。
恕我直言,您收到异常然后尝试解析空字符串。
是的,检查 null
的交换
if (oldExchange == null) {
return newExchange;
}
更新:
像这样尝试:
public class lowestRates implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
if (newExchange.getIn().getBody()!=null){
Pattern p = Pattern.compile("(\w+)\s(\d+)");
String finalStr = "";
String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);
if (oldStr!=null&&newStr!=null){
Matcher m1 = p.matcher(oldStr);
Matcher m2 = p.matcher(newStr);
if(m1.group(2).equalsIgnoreCase(m2.group(2)))
finalStr = m1.group(1) + Integer.toString(Integer.parseInt(m1.group(2)) > Integer.parseInt(m2.group(2)) ? Integer.parseInt(m2.group(2)) : Integer.parseInt(m1.group(2)));
else
finalStr = oldStr + "\n" + newStr;
}
oldExchange.getIn().setBody(finalStr);
}
return oldExchange;
}
}
还需要修改聚合如下,
发件人:
Aggregate(new strategy()).body() //This is where things go wrong
收件人:
Aggregate(constant(true), new strategy()).completionFromBatchConsumer()
关于问题:
提示我无法通过拆分方法获取字符串列表,所以我尝试了如下聚合方法:
public class lowestRates implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);
Pattern p = Pattern.compile("(\w+)\s(\d+)");
Matcher m1 = p.matcher(oldStr);
Matcher m2 = p.matcher(newStr);
String finalStr = "";
if(m1.group(2).equalsIgnoreCase(m2.group(2)))
finalStr = m1.group(1) + Integer.toString(Integer.parseInt(m1.group(2)) > Integer.parseInt(m2.group(2)) ? Integer.parseInt(m2.group(2)) : Integer.parseInt(m1.group(2)));
else
finalStr = oldStr + "\n" + newStr;
oldExchange.getIn().setBody(finalStr);
return oldExchange;
}
}
和新的主要代码:
from("file://files")
.split()
.tokenize("\n")
.aggregate(new lowestRates())
.body()
.completionTimeout(5000)
.to("file://files/result.txt")
但它给了我:
org.apache.camel.CamelExchangeException: Error occurred during aggregation. Exchange[][Message: Good1 450]. Caused by: [java.lang.NullPointerException - null]
现在的问题是如何编写正确的聚合方法,因为我看不出这里有什么错误:(。
String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);
您需要在此处检查字符串是否为 null.. 对于第一条消息,oldStr 将为空。 恕我直言,您收到异常然后尝试解析空字符串。 是的,检查 null
的交换 if (oldExchange == null) {
return newExchange;
}
更新: 像这样尝试:
public class lowestRates implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
if (newExchange.getIn().getBody()!=null){
Pattern p = Pattern.compile("(\w+)\s(\d+)");
String finalStr = "";
String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);
if (oldStr!=null&&newStr!=null){
Matcher m1 = p.matcher(oldStr);
Matcher m2 = p.matcher(newStr);
if(m1.group(2).equalsIgnoreCase(m2.group(2)))
finalStr = m1.group(1) + Integer.toString(Integer.parseInt(m1.group(2)) > Integer.parseInt(m2.group(2)) ? Integer.parseInt(m2.group(2)) : Integer.parseInt(m1.group(2)));
else
finalStr = oldStr + "\n" + newStr;
}
oldExchange.getIn().setBody(finalStr);
}
return oldExchange;
}
}
还需要修改聚合如下,
发件人:
Aggregate(new strategy()).body() //This is where things go wrong
收件人:
Aggregate(constant(true), new strategy()).completionFromBatchConsumer()