如何在 apache camel 中正确使用聚合器将大量数据插入 mongodb

How to use Aggregator correctly in apache camel to insert huge data into mongodb

我正在使用 apache camel 从 oracle DB 中读取大量 XMLTYPE 数据(大约 500M 行),对其进行处理并将其插入 mongodb。我正在使用拆分器进行处理,输出消息在 org.bson.Document 中。 问题出在聚合器上,它 returns 异常 无法反序列化为 START_ARRAY 令牌 或插入的行数很少(只有 9 行).

这是聚合器的代码:

public class dAggregator implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Document newBody = newExchange.getIn().getBody(Document.class);
        ArrayList<Document> list = new ArrayList<>();
        if (oldExchange == null) {
            list.add(newBody);
            newExchange.getIn().setBody(list);
            return newExchange;
        } else {
            list.add(oldExchange.getIn().getBody(Document.class));
            return oldExchange;
        }
    }
}

骆驼语境:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    
    <threadPool id="myPool" rejectedPolicy="CallerRuns" threadName="spl" maxQueueSize="120" maxPoolSize="250" poolSize="80"/>
    <threadPool id="aggPool" rejectedPolicy="CallerRuns" threadName="agg" maxQueueSize="120" maxPoolSize="150" poolSize="50"/>
     
    <route streamCache="true">
        <from uri="timer://foo?repeatCount=1"/>
            <setBody>
                <constant>SELECT * FROM CUSTOMER</constant>
            </setBody>
            <to id="sql" uri="jdbc:myds?outputType=StreamList&resetAutoCommit=false&statement.fetchSize=100000"/>
            <split parallelProcessing="true" executorServiceRef="myPool" stopOnException="true" streaming="true">
                <simple>${in.body}</simple>
                <process ref="rowProcessor"/>
                <to uri="seda:toinsert?size=1000000&amp;blockWhenFull=true"/>
            </split>
    </route>

    <route streamCache="true">
        <from uri="seda:toinsert"/>
            <aggregate completionSize="50000" completionTimeout="30000" id="aggregate_1" strategyRef="aggStrategy" parallelProcessing="true" executorServiceRef="aggPool">
            <correlationExpression>
                <constant>true</constant>
            </correlationExpression>
            <to uri="mongodb:mongoBean?database=mydb&collection=customer&operation=insert"/>
    </route>

</camelContext>

有什么不对或遗漏的吗?任何想法或提示都非常受欢迎。

感谢 Chin Huang 的建议,我将 oldExchange.getIn().getBody(Document.class) 更改为 oldExchange.getIn().getBody(ArrayList.class),现在可以正常使用了。