Camel,使用字段条件将大型 XML 文件拆分为 header

Camel, split large XML file with header, using field condition

我正在尝试设置一个 Apache Camel 路由,该路由输入一个大的 XML 文件,然后使用字段条件将有效负载拆分为两个不同的文件。 IE。如果 ID 字段以 1 开头,它将转到一个输出文件,否则转到另一个。使用 Camel 不是必须的,我也查看了 XSLT 和常规 Java 选项,但我只是觉得这应该有效。

我已经介绍了拆分实际有效载荷,但我在确保 parent 节点(包括 header 也包含在每个文件中时遇到了问题。由于文件可能很大,我想确保流用于有效负载。我觉得我已经在这里阅读了数百个不同的问题、博客条目等,几乎每个案例都涵盖了将整个文件加载到内存中,将文件平均分成多个部分,仅单独使用有效负载节点。

我的原型 XML 文件如下所示:

<root>
    <header>
        <title>Testing</title>
    </header>
    <orders>
        <order>
            <id>11</id>
            <stuff>One</stuff>
        </order>
        <order>
            <id>20</id>
            <stuff>Two</stuff>
        </order>
        <order>
            <id>12</id>
            <stuff>Three</stuff>
        </order>
    </orders> 
</root>

结果应该是两个文件-条件为真(id以1开头):

<root>
    <header>
        <title>Testing</title>
    </header>
    <orders>
        <order>
            <id>11</id>
            <stuff>One</stuff>
        </order>
        <order>
            <id>12</id>
            <stuff>Three</stuff>
        </order>
    </orders> 
</root>

条件错误:

<root>
    <header>
        <title>Testing</title>
    </header>
    <orders>
        <order>
            <id>20</id>
            <stuff>Two</stuff>
        </order>
    </orders> 
</root>

我的原型路线:

from("file:" + inputFolder)
.log("Processing file ${headers.CamelFileName}")
.split()
    .tokenizeXML("order", "*") // Includes parent in every node
    .streaming()
    .choice()
        .when(body().contains("id>1"))
            .to("direct:ones")
            .stop()
        .otherwise()
            .to("direct:others")
            .stop()
    .end()
.end();

from("direct:ones")
//.aggregate(header("ones"), new StringAggregator()) // missing end condition
.to("file:" + outputFolder + "?fileName=ones-${in.header.CamelFileName}&fileExist=Append");

from("direct:others")
//.aggregate(header("others"), new StringAggregator()) // missing end condition
.to("file:" + outputFolder + "?fileName=others-${in.header.CamelFileName}&fileExist=Append");

除了为每个节点添加 parent 标记(header 和页脚,如果你愿意的话),这按预期工作。仅使用 tokenizeXML 中的节点 returns 仅节点本身,但我不知道如何添加 header 和页脚。最好我想将 parent 标签流式传输到 header 和页脚 属性 中,并在拆分前后添加它们。

我该怎么做?我是否需要先标记 parent 标签,这是否意味着流式传输文件两次?

最后一点,您可能会注意到最后的聚合。我不想在写入文件之前聚合每个节点,因为这违背了流式传输它并使整个文件不在内存中的目的,但我想我可能会通过在写入之前聚合多个节点来获得一些性能文件,以减少为每个节点写入驱动器的性能损失。我不确定这样做是否有意义。

我无法让它与 Camel 一起工作。或者更确切地说,当使用普通 Java 来提取 header 时,我已经拥有了继续进行拆分和换回 Camel 所需的一切,这看起来很麻烦。最有可能对此进行改进的方法是,但这是我拆分 XML 有效负载的解决方案。

在两种类型的输出流之间切换不是很好,但它简化了其他所有内容的使用。另外值得注意的是,即使 XML 通常区分大小写,我还是选择了 equalsIgnoreCase 来检查标签名称。对我来说,它降低了出错的风险。最后,确保您的正则表达式使用通配符匹配整个字符串,就像正常的字符串正则表达式一样。

/**
 * Splits a XML file's payload into two new files based on a regex condition. The payload is a specific XML tag in the
 * input file that is repeated a number of times. All tags before and after the payload are added to both files in order
 * to keep the same structure.
 * 
 * The content of each payload tag is compared to the regex condition and if true, it is added to the primary output file.
 * Otherwise it is added to the secondary output file. The payload can be empty and an empty payload tag will be added to
 * the secondary output file. Note that the output will not be an unaltered copy of the input as self-closing XML tags are
 * altered to corresponding opening and closing tags.
 * 
 * Data is streamed from the input file to the output files, keeping memory usage small even with large files.
 * 
 * @param inputFilename Path and filename for the input XML file
 * @param outputFilenamePrimary Path and filename for the primary output file
 * @param outputFilenameSecondary Path and filename for the secondary output file
 * @param payloadTag XML tag name of the payload
 * @param payloadParentTag XML tag name of the payload's direct parent
 * @param splitRegex The regex split condition used on the payload content
 * @throws Exception On invalid filenames, missing input, incorrect XML structure, etc.
 */
public static void splitXMLPayload(String inputFilename, String outputFilenamePrimary, String outputFilenameSecondary, String payloadTag, String payloadParentTag, String splitRegex) throws Exception {

    XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
    XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newInstance();
    XMLEventReader xmlEventReader = null;
    FileInputStream fileInputStream = null;
    FileWriter fileWriterPrimary = null;
    FileWriter fileWriterSecondary = null;
    XMLEventWriter xmlEventWriterSplitPrimary = null;
    XMLEventWriter xmlEventWriterSplitSecondary = null;

    try {
        fileInputStream = new FileInputStream(inputFilename);
        xmlEventReader = xmlInputFactory.createXMLEventReader(fileInputStream);

        fileWriterPrimary = new FileWriter(outputFilenamePrimary);
        fileWriterSecondary = new FileWriter(outputFilenameSecondary);
        xmlEventWriterSplitPrimary = xmlOutputFactory.createXMLEventWriter(fileWriterPrimary);
        xmlEventWriterSplitSecondary = xmlOutputFactory.createXMLEventWriter(fileWriterSecondary);

        boolean isStart = true;
        boolean isEnd = false;
        boolean lastSplitIsPrimary = true;

        while (xmlEventReader.hasNext()) {
            XMLEvent xmlEvent = xmlEventReader.nextEvent();

            // Check for start of payload element
            if (!isEnd && xmlEvent.isStartElement()) {
                StartElement startElement = xmlEvent.asStartElement();
                if (startElement.getName().getLocalPart().equalsIgnoreCase(payloadTag)) {
                    if (isStart) {
                        isStart = false;
                        // Flush the event writers as we'll use the file writers for the payload
                        xmlEventWriterSplitPrimary.flush();
                        xmlEventWriterSplitSecondary.flush();
                    }

                    String order = getTagAsString(xmlEventReader, xmlEvent, payloadTag, xmlOutputFactory);
                    if (order.matches(splitRegex)) {
                        lastSplitIsPrimary = true;
                        fileWriterPrimary.write(order);
                    } else {
                        lastSplitIsPrimary = false;
                        fileWriterSecondary.write(order);
                    }
                }
            }
            // Check for end of parent tag
            else if (!isStart && !isEnd && xmlEvent.isEndElement()) {
                EndElement endElement = xmlEvent.asEndElement();
                if (endElement.getName().getLocalPart().equalsIgnoreCase(payloadParentTag)) {
                    isEnd = true;
                }
            }
            // Is neither start or end and we're handling payload (most often white space)
            else if (!isStart && !isEnd) {
                // Add to last split handled
                if (lastSplitIsPrimary) {
                    xmlEventWriterSplitPrimary.add(xmlEvent);
                    xmlEventWriterSplitPrimary.flush();
                } else {
                    xmlEventWriterSplitSecondary.add(xmlEvent);
                    xmlEventWriterSplitSecondary.flush();
                }
            }

            // Start and end is added to both files
            if (isStart || isEnd) {
                xmlEventWriterSplitPrimary.add(xmlEvent);
                xmlEventWriterSplitSecondary.add(xmlEvent);
            }
        }

    } catch (Exception e) {
        logger.error("Error in XML split", e);
        throw e;
    } finally {
        // Close the streams
        try {
            xmlEventReader.close();
        } catch (XMLStreamException e) {
            // ignore
        }
        try {
            xmlEventReader.close();
        } catch (XMLStreamException e) {
            // ignore
        }
        try {
            xmlEventWriterSplitPrimary.close();
        } catch (XMLStreamException e) {
            // ignore
        }
        try {
            xmlEventWriterSplitSecondary.close();
        } catch (XMLStreamException e) {
            // ignore
        }
        try {
            fileWriterPrimary.close();
        } catch (IOException e) {
            // ignore
        }
        try {
            fileWriterSecondary.close();
        } catch (IOException e) {
            // ignore
        }
    }
}

/**
 * Loops through the events in the {@code XMLEventReader} until the specific XML end tag is found and returns everything
 * contained within the XML tag as a String.
 * 
 * Data is streamed from the {@code XMLEventReader}, however the String can be large depending of the number of children
 * in the XML tag.
 * 
 * @param xmlEventReader The already active reader. The starting tag event is assumed to have already been read
 * @param startEvent The starting XML tag event already read from the {@code XMLEventReader}
 * @param tag The XML tag name used to find the starting XML tag
 * @param xmlOutputFactory Convenience include to avoid creating another factory
 * @return String containing everything between the starting and ending XML tag, the tags themselves included
 * @throws Exception On incorrect XML structure
 */
private static String getTagAsString(XMLEventReader xmlEventReader, XMLEvent startEvent, String tag, XMLOutputFactory xmlOutputFactory) throws Exception {
    StringWriter stringWriter = new StringWriter();
    XMLEventWriter xmlEventWriter = xmlOutputFactory.createXMLEventWriter(stringWriter);

    // Add the start tag
    xmlEventWriter.add(startEvent);

    // Add until end tag
    while (xmlEventReader.hasNext()) {
        XMLEvent xmlEvent = xmlEventReader.nextEvent();

        // End tag found
        if (xmlEvent.isEndElement() && xmlEvent.asEndElement().getName().getLocalPart().equalsIgnoreCase(tag)) {
            xmlEventWriter.add(xmlEvent);
            xmlEventWriter.close();
            stringWriter.close();

            return stringWriter.toString();
        } else {
            xmlEventWriter.add(xmlEvent);
        }
    }

    xmlEventWriter.close();
    stringWriter.close();
    throw new Exception("Invalid XML, no closing tag for <" + tag + "> found!");
}