如何在 apache camel 中正确使用聚合器将大量数据插入 mongodb
How to use Aggregator correctly in apache camel to insert huge data into mongodb
我正在使用 apache camel 从 oracle DB 中读取大量 XMLTYPE 数据(大约 500M 行),对其进行处理并将其插入 mongodb。我正在使用拆分器进行处理,输出消息在 org.bson.Document 中。
问题出在聚合器上,它 returns 异常 无法反序列化为 START_ARRAY 令牌 或插入的行数很少(只有 9 行).
这是聚合器的代码:
public class dAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Document newBody = newExchange.getIn().getBody(Document.class);
ArrayList<Document> list = new ArrayList<>();
if (oldExchange == null) {
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list.add(oldExchange.getIn().getBody(Document.class));
return oldExchange;
}
}
}
骆驼语境:
<camelContext xmlns="http://camel.apache.org/schema/spring">
<threadPool id="myPool" rejectedPolicy="CallerRuns" threadName="spl" maxQueueSize="120" maxPoolSize="250" poolSize="80"/>
<threadPool id="aggPool" rejectedPolicy="CallerRuns" threadName="agg" maxQueueSize="120" maxPoolSize="150" poolSize="50"/>
<route streamCache="true">
<from uri="timer://foo?repeatCount=1"/>
<setBody>
<constant>SELECT * FROM CUSTOMER</constant>
</setBody>
<to id="sql" uri="jdbc:myds?outputType=StreamList&resetAutoCommit=false&statement.fetchSize=100000"/>
<split parallelProcessing="true" executorServiceRef="myPool" stopOnException="true" streaming="true">
<simple>${in.body}</simple>
<process ref="rowProcessor"/>
<to uri="seda:toinsert?size=1000000&blockWhenFull=true"/>
</split>
</route>
<route streamCache="true">
<from uri="seda:toinsert"/>
<aggregate completionSize="50000" completionTimeout="30000" id="aggregate_1" strategyRef="aggStrategy" parallelProcessing="true" executorServiceRef="aggPool">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<to uri="mongodb:mongoBean?database=mydb&collection=customer&operation=insert"/>
</route>
</camelContext>
有什么不对或遗漏的吗?任何想法或提示都非常受欢迎。
感谢 Chin Huang 的建议,我将 oldExchange.getIn().getBody(Document.class)
更改为 oldExchange.getIn().getBody(ArrayList.class)
,现在可以正常使用了。
我正在使用 apache camel 从 oracle DB 中读取大量 XMLTYPE 数据(大约 500M 行),对其进行处理并将其插入 mongodb。我正在使用拆分器进行处理,输出消息在 org.bson.Document 中。 问题出在聚合器上,它 returns 异常 无法反序列化为 START_ARRAY 令牌 或插入的行数很少(只有 9 行).
这是聚合器的代码:
public class dAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Document newBody = newExchange.getIn().getBody(Document.class);
ArrayList<Document> list = new ArrayList<>();
if (oldExchange == null) {
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list.add(oldExchange.getIn().getBody(Document.class));
return oldExchange;
}
}
}
骆驼语境:
<camelContext xmlns="http://camel.apache.org/schema/spring">
<threadPool id="myPool" rejectedPolicy="CallerRuns" threadName="spl" maxQueueSize="120" maxPoolSize="250" poolSize="80"/>
<threadPool id="aggPool" rejectedPolicy="CallerRuns" threadName="agg" maxQueueSize="120" maxPoolSize="150" poolSize="50"/>
<route streamCache="true">
<from uri="timer://foo?repeatCount=1"/>
<setBody>
<constant>SELECT * FROM CUSTOMER</constant>
</setBody>
<to id="sql" uri="jdbc:myds?outputType=StreamList&resetAutoCommit=false&statement.fetchSize=100000"/>
<split parallelProcessing="true" executorServiceRef="myPool" stopOnException="true" streaming="true">
<simple>${in.body}</simple>
<process ref="rowProcessor"/>
<to uri="seda:toinsert?size=1000000&blockWhenFull=true"/>
</split>
</route>
<route streamCache="true">
<from uri="seda:toinsert"/>
<aggregate completionSize="50000" completionTimeout="30000" id="aggregate_1" strategyRef="aggStrategy" parallelProcessing="true" executorServiceRef="aggPool">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<to uri="mongodb:mongoBean?database=mydb&collection=customer&operation=insert"/>
</route>
</camelContext>
有什么不对或遗漏的吗?任何想法或提示都非常受欢迎。
感谢 Chin Huang 的建议,我将 oldExchange.getIn().getBody(Document.class)
更改为 oldExchange.getIn().getBody(ArrayList.class)
,现在可以正常使用了。