Mule ESB 集合聚合器处理的问题

Issues with Mule ESB collection-aggregator processing

我试图完成的过程的一个简化版本是,我将拥有应该按顺序处理的文件集,并且只作为完整集处理。为了概念验证,我创建了一个流程来收集一组名为 "File1*YYMMDD*.txt" 和 "File2*YYMMDD*.txt" 的两个文件,这将构成一组日期 YYMMDD。我使用文件入站端点来监视文件并使用名称的日期部分来定义关联 ID。集合聚合器然后将这些分组为一组 2 和一个文件出站然后从集合中分派文件:

<configuration> 
    <default-threading-profile doThreading="false" />
</configuration>

<flow name="Aggregator">
    <file:inbound-endpoint path="G:/SourceDir" moveToDirectory="G:/SourceDir/Archive" 
                        responseTimeout="10000" doc:name="get-working-files" 
                        pollingFrequency="5000" fileAge="600000">
        <file:filename-regex-filter pattern="File1(.*).txt|File2(.*).txt" caseSensitive="false"/>
        <message-properties-transformer>
            <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
            <add-message-property key="MULE_CORRELATION_ID" 
                    value="#[message.inboundProperties
                                    .originalFilename
                                    .substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]"  />
        </message-properties-transformer>
    </file:inbound-endpoint>

    <collection-aggregator timeout="86400000" failOnTimeout="false" doc:name="Collection Aggregator">
    </collection-aggregator>  

    <foreach doc:name="For Each"> 
        <logger message="Processing: #[message.inboundProperties.originalFilename]" level="INFO" 
                doc:name="Some process"/>
        <file:outbound-endpoint responseTimeout="10000" doc:name="Destination" 
                outputPattern="#[function:datestamp:yyyyMMdd.HHmmss].#[message.inboundProperties.originalFilename]" 
                path="G:/DestDir"/>
    </foreach>  
</flow>

我有两个问题。

1) 如果我只有一个文件,比如说 File2150102.txt,流程会正确识别出文件集不完整并等待。大约 1 分钟后,该文件再次被锁定并被接受为集合中的第二个文件。该文件通过出站端点处理并存档,然后再次尝试对该文件执行此过程,但由于该文件已被删除而失败:

INFO 2015-07-14 11:19:51,205 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: G:\SourceDir\File2150102.txt INFO 2015-07-14 11:21:01,241 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: G:\SourceDir\File2150102.txt INFO 2015-07-14 11:21:01,273 [[fileset].connector.file.mule.default.receiver.01] org.mule.api.processor.LoggerMessageProcessor: Processing: File2150102.txt INFO 2015-07-14 11:21:01,304 [[fileset].connector.file.mule.default.receiver.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.file.mule.default.dispatcher.452370795'. Object is: FileMessageDispatcher INFO 2015-07-14 11:21:01,304 [[fileset].connector.file.mule.default.receiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.file.mule.default.dispatcher.452370795'. Object is: FileMessageDispatcher INFO 2015-07-14 11:21:01,320 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileConnector: Writing file to: G:\DestDir150714.112101.File2150102.txt WARN 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.ReceiverFileInputStream: Failed to move file from G:\SourceDir\File2150102.txt to G:\SourceDir\archive\File2150102.txt

INFO 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.api.processor.LoggerMessageProcessor: Processing: File2150102.txt INFO 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileConnector: Writing file to: G:\DestDir150714.112101.File2150102.txt WARN 2015-07-14 11:21:01,476 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Failure trying to remove file G:\SourceDir\File2150102.txt from list of files under processing

我找不到任何设置来控制再次抓取文件的迭代,我的轮询频率设置为 5 秒,我需要 10 分钟的文件寿命,并为收集超时设置了很长的 10天所以它应该坐等到找到另一个文件,但我不希望它再次拾取同一个文件。

2) 在更复杂的情况下,我在目录中有文件:File1150201.txt、File2150201.txt、File1150202.txt、File1150203.txt 和 File2150203.txt .流程开始抓取文件,正确查找和处理“150201”的集合并发送它。它找到 150202 的文件,识别出它需要第二个文件并且不处理它。然后它会找到“150203”的完整集合并对其进行处理。我需要它在处理“150202”集之前不处理这个集。有人能告诉我如何让它等待不完整的集合而不是继续其他集合吗?我有正确的处理顺序,只是不能等待丢失的文件并在有不完整的集合时保持集合的顺序。

我不确定我是否理解正确,但是对于您的问题 1,匹配(并等待不完整的集合)正在通过以下测试流程为我工作 --

 <file:connector name="File" autoDelete="false" streaming="false" validateConnections="true" doc:name="File">
        <file:expression-filename-parser />
    </file:connector>
    <file:connector name="File1" autoDelete="false" outputAppend="true" streaming="false" validateConnections="true" doc:name="File"/>
    <vm:connector name="VM" validateConnections="true" doc:name="VM">
    <receiver-threading-profile maxThreadsActive="1"></receiver-threading-profile>
    </vm:connector>
    <flow name="fileaggreFlow2" doc:name="fileaggreFlow2">
        <file:inbound-endpoint path="C:\InFile" moveToDirectory="C:\InFile\Archive" responseTimeout="10000" connector-ref="File" doc:name="File">

        </file:inbound-endpoint>
        <message-properties-transformer overwrite="true" doc:name="Message Properties">
            <add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(5,13)]"/>
            <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
            <add-message-property key="MULE_CORRELATION_SEQUENCE" value="#[message.inboundProperties.originalFilename.substring(0,5)]"/>
        </message-properties-transformer>
        <vm:outbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
    </flow>


    <flow name="fileaggreFlow1" doc:name="fileaggreFlow1" processingStrategy="synchronous">
        <vm:inbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
        <logger level="INFO" doc:name="Logger"/>
        <processor-chain doc:name="Processor Chain">
            <collection-aggregator timeout="1000000" failOnTimeout="true" storePrefix="#[MULE_CORRELATION_ID]" doc:name="Collection Aggregator"/>
            <logger message="#[payload]" level="INFO" doc:name="Logger"/>
            <foreach doc:name="For Each">
                <logger message="Processing: #[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
                <file:outbound-endpoint path="C:\TestFile" outputPattern="#[message.inboundProperties.originalFilename.substring(5,17)]" responseTimeout="10000" connector-ref="File1" doc:name="Destination"/>
            </foreach>
        </processor-chain>
    </flow>  

如果您能post完整的流程,将会有所帮助。我的文件名是 File120150107.txt(依此类推...)

我认为你的问题只是因为你设置了 failOnTimeout = 'false'。设为 'True'

    <collection-aggregator timeout="86400000" failOnTimeout="true" doc:name="Collection Aggregator">

.

它将等待文件(2 或 3 个文件,根据您的要求),直到达到特定时间(此处为 86400000)。一旦超过,就会失败。

以您的情况为例(FailOnTime= 'False')。如果您尝试发送 4 个文件。如果只收到 2 个文件,则在该时间内。不完整的文件将被处理(它不会等待剩余的 2 个文件)。

尝试检查您计划处理多少个文件,以及处理需要多长时间(example:4 个文件),相应地调整时间。