Mule ESB 集合聚合器处理的问题
Issues with Mule ESB collection-aggregator processing
我试图完成的过程的一个简化版本是,我将拥有应该按顺序处理的文件集,并且只作为完整集处理。为了概念验证,我创建了一个流程来收集一组名为 "File1*YYMMDD*.txt" 和 "File2*YYMMDD*.txt" 的两个文件,这将构成一组日期 YYMMDD。我使用文件入站端点来监视文件并使用名称的日期部分来定义关联 ID。集合聚合器然后将这些分组为一组 2 和一个文件出站然后从集合中分派文件:
<configuration>
<default-threading-profile doThreading="false" />
</configuration>
<flow name="Aggregator">
<file:inbound-endpoint path="G:/SourceDir" moveToDirectory="G:/SourceDir/Archive"
responseTimeout="10000" doc:name="get-working-files"
pollingFrequency="5000" fileAge="600000">
<file:filename-regex-filter pattern="File1(.*).txt|File2(.*).txt" caseSensitive="false"/>
<message-properties-transformer>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
<add-message-property key="MULE_CORRELATION_ID"
value="#[message.inboundProperties
.originalFilename
.substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]" />
</message-properties-transformer>
</file:inbound-endpoint>
<collection-aggregator timeout="86400000" failOnTimeout="false" doc:name="Collection Aggregator">
</collection-aggregator>
<foreach doc:name="For Each">
<logger message="Processing: #[message.inboundProperties.originalFilename]" level="INFO"
doc:name="Some process"/>
<file:outbound-endpoint responseTimeout="10000" doc:name="Destination"
outputPattern="#[function:datestamp:yyyyMMdd.HHmmss].#[message.inboundProperties.originalFilename]"
path="G:/DestDir"/>
</foreach>
</flow>
我有两个问题。
1) 如果我只有一个文件,比如说 File2150102.txt,流程会正确识别出文件集不完整并等待。大约 1 分钟后,该文件再次被锁定并被接受为集合中的第二个文件。该文件通过出站端点处理并存档,然后再次尝试对该文件执行此过程,但由于该文件已被删除而失败:
INFO 2015-07-14 11:19:51,205 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: G:\SourceDir\File2150102.txt
INFO 2015-07-14 11:21:01,241 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: G:\SourceDir\File2150102.txt
INFO 2015-07-14 11:21:01,273 [[fileset].connector.file.mule.default.receiver.01] org.mule.api.processor.LoggerMessageProcessor: Processing: File2150102.txt
INFO 2015-07-14 11:21:01,304 [[fileset].connector.file.mule.default.receiver.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.file.mule.default.dispatcher.452370795'. Object is: FileMessageDispatcher
INFO 2015-07-14 11:21:01,304 [[fileset].connector.file.mule.default.receiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.file.mule.default.dispatcher.452370795'. Object is: FileMessageDispatcher
INFO 2015-07-14 11:21:01,320 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileConnector: Writing file to: G:\DestDir150714.112101.File2150102.txt
WARN 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.ReceiverFileInputStream: Failed to move file from G:\SourceDir\File2150102.txt to G:\SourceDir\archive\File2150102.txt
INFO 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.api.processor.LoggerMessageProcessor: Processing: File2150102.txt
INFO 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileConnector: Writing file to: G:\DestDir150714.112101.File2150102.txt
WARN 2015-07-14 11:21:01,476 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Failure trying to remove file G:\SourceDir\File2150102.txt from list of files under processing
我找不到任何设置来控制再次抓取文件的迭代,我的轮询频率设置为 5 秒,我需要 10 分钟的文件寿命,并为收集超时设置了很长的 10天所以它应该坐等到找到另一个文件,但我不希望它再次拾取同一个文件。
2) 在更复杂的情况下,我在目录中有文件:File1150201.txt、File2150201.txt、File1150202.txt、File1150203.txt 和 File2150203.txt .流程开始抓取文件,正确查找和处理“150201”的集合并发送它。它找到 150202 的文件,识别出它需要第二个文件并且不处理它。然后它会找到“150203”的完整集合并对其进行处理。我需要它在处理“150202”集之前不处理这个集。有人能告诉我如何让它等待不完整的集合而不是继续其他集合吗?我有正确的处理顺序,只是不能等待丢失的文件并在有不完整的集合时保持集合的顺序。
我不确定我是否理解正确,但是对于您的问题 1,匹配(并等待不完整的集合)正在通过以下测试流程为我工作 --
<file:connector name="File" autoDelete="false" streaming="false" validateConnections="true" doc:name="File">
<file:expression-filename-parser />
</file:connector>
<file:connector name="File1" autoDelete="false" outputAppend="true" streaming="false" validateConnections="true" doc:name="File"/>
<vm:connector name="VM" validateConnections="true" doc:name="VM">
<receiver-threading-profile maxThreadsActive="1"></receiver-threading-profile>
</vm:connector>
<flow name="fileaggreFlow2" doc:name="fileaggreFlow2">
<file:inbound-endpoint path="C:\InFile" moveToDirectory="C:\InFile\Archive" responseTimeout="10000" connector-ref="File" doc:name="File">
</file:inbound-endpoint>
<message-properties-transformer overwrite="true" doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(5,13)]"/>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
<add-message-property key="MULE_CORRELATION_SEQUENCE" value="#[message.inboundProperties.originalFilename.substring(0,5)]"/>
</message-properties-transformer>
<vm:outbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
</flow>
<flow name="fileaggreFlow1" doc:name="fileaggreFlow1" processingStrategy="synchronous">
<vm:inbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
<logger level="INFO" doc:name="Logger"/>
<processor-chain doc:name="Processor Chain">
<collection-aggregator timeout="1000000" failOnTimeout="true" storePrefix="#[MULE_CORRELATION_ID]" doc:name="Collection Aggregator"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<foreach doc:name="For Each">
<logger message="Processing: #[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
<file:outbound-endpoint path="C:\TestFile" outputPattern="#[message.inboundProperties.originalFilename.substring(5,17)]" responseTimeout="10000" connector-ref="File1" doc:name="Destination"/>
</foreach>
</processor-chain>
</flow>
如果您能post完整的流程,将会有所帮助。我的文件名是 File120150107.txt(依此类推...)
我认为你的问题只是因为你设置了 failOnTimeout = 'false'。设为 'True'
<collection-aggregator timeout="86400000" failOnTimeout="true" doc:name="Collection Aggregator">
.
它将等待文件(2 或 3 个文件,根据您的要求),直到达到特定时间(此处为 86400000)。一旦超过,就会失败。
以您的情况为例(FailOnTime= 'False')。如果您尝试发送 4 个文件。如果只收到 2 个文件,则在该时间内。不完整的文件将被处理(它不会等待剩余的 2 个文件)。
尝试检查您计划处理多少个文件,以及处理需要多长时间(example:4 个文件),相应地调整时间。
我试图完成的过程的一个简化版本是,我将拥有应该按顺序处理的文件集,并且只作为完整集处理。为了概念验证,我创建了一个流程来收集一组名为 "File1*YYMMDD*.txt" 和 "File2*YYMMDD*.txt" 的两个文件,这将构成一组日期 YYMMDD。我使用文件入站端点来监视文件并使用名称的日期部分来定义关联 ID。集合聚合器然后将这些分组为一组 2 和一个文件出站然后从集合中分派文件:
<configuration>
<default-threading-profile doThreading="false" />
</configuration>
<flow name="Aggregator">
<file:inbound-endpoint path="G:/SourceDir" moveToDirectory="G:/SourceDir/Archive"
responseTimeout="10000" doc:name="get-working-files"
pollingFrequency="5000" fileAge="600000">
<file:filename-regex-filter pattern="File1(.*).txt|File2(.*).txt" caseSensitive="false"/>
<message-properties-transformer>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
<add-message-property key="MULE_CORRELATION_ID"
value="#[message.inboundProperties
.originalFilename
.substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]" />
</message-properties-transformer>
</file:inbound-endpoint>
<collection-aggregator timeout="86400000" failOnTimeout="false" doc:name="Collection Aggregator">
</collection-aggregator>
<foreach doc:name="For Each">
<logger message="Processing: #[message.inboundProperties.originalFilename]" level="INFO"
doc:name="Some process"/>
<file:outbound-endpoint responseTimeout="10000" doc:name="Destination"
outputPattern="#[function:datestamp:yyyyMMdd.HHmmss].#[message.inboundProperties.originalFilename]"
path="G:/DestDir"/>
</foreach>
</flow>
我有两个问题。
1) 如果我只有一个文件,比如说 File2150102.txt,流程会正确识别出文件集不完整并等待。大约 1 分钟后,该文件再次被锁定并被接受为集合中的第二个文件。该文件通过出站端点处理并存档,然后再次尝试对该文件执行此过程,但由于该文件已被删除而失败:
INFO 2015-07-14 11:19:51,205 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: G:\SourceDir\File2150102.txt INFO 2015-07-14 11:21:01,241 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: G:\SourceDir\File2150102.txt INFO 2015-07-14 11:21:01,273 [[fileset].connector.file.mule.default.receiver.01] org.mule.api.processor.LoggerMessageProcessor: Processing: File2150102.txt INFO 2015-07-14 11:21:01,304 [[fileset].connector.file.mule.default.receiver.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.file.mule.default.dispatcher.452370795'. Object is: FileMessageDispatcher INFO 2015-07-14 11:21:01,304 [[fileset].connector.file.mule.default.receiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.file.mule.default.dispatcher.452370795'. Object is: FileMessageDispatcher INFO 2015-07-14 11:21:01,320 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileConnector: Writing file to: G:\DestDir150714.112101.File2150102.txt WARN 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.ReceiverFileInputStream: Failed to move file from G:\SourceDir\File2150102.txt to G:\SourceDir\archive\File2150102.txt
INFO 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.api.processor.LoggerMessageProcessor: Processing: File2150102.txt INFO 2015-07-14 11:21:01,336 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileConnector: Writing file to: G:\DestDir150714.112101.File2150102.txt WARN 2015-07-14 11:21:01,476 [[fileset].connector.file.mule.default.receiver.01] org.mule.transport.file.FileMessageReceiver: Failure trying to remove file G:\SourceDir\File2150102.txt from list of files under processing
我找不到任何设置来控制再次抓取文件的迭代,我的轮询频率设置为 5 秒,我需要 10 分钟的文件寿命,并为收集超时设置了很长的 10天所以它应该坐等到找到另一个文件,但我不希望它再次拾取同一个文件。
2) 在更复杂的情况下,我在目录中有文件:File1150201.txt、File2150201.txt、File1150202.txt、File1150203.txt 和 File2150203.txt .流程开始抓取文件,正确查找和处理“150201”的集合并发送它。它找到 150202 的文件,识别出它需要第二个文件并且不处理它。然后它会找到“150203”的完整集合并对其进行处理。我需要它在处理“150202”集之前不处理这个集。有人能告诉我如何让它等待不完整的集合而不是继续其他集合吗?我有正确的处理顺序,只是不能等待丢失的文件并在有不完整的集合时保持集合的顺序。
我不确定我是否理解正确,但是对于您的问题 1,匹配(并等待不完整的集合)正在通过以下测试流程为我工作 --
<file:connector name="File" autoDelete="false" streaming="false" validateConnections="true" doc:name="File">
<file:expression-filename-parser />
</file:connector>
<file:connector name="File1" autoDelete="false" outputAppend="true" streaming="false" validateConnections="true" doc:name="File"/>
<vm:connector name="VM" validateConnections="true" doc:name="VM">
<receiver-threading-profile maxThreadsActive="1"></receiver-threading-profile>
</vm:connector>
<flow name="fileaggreFlow2" doc:name="fileaggreFlow2">
<file:inbound-endpoint path="C:\InFile" moveToDirectory="C:\InFile\Archive" responseTimeout="10000" connector-ref="File" doc:name="File">
</file:inbound-endpoint>
<message-properties-transformer overwrite="true" doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(5,13)]"/>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
<add-message-property key="MULE_CORRELATION_SEQUENCE" value="#[message.inboundProperties.originalFilename.substring(0,5)]"/>
</message-properties-transformer>
<vm:outbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
</flow>
<flow name="fileaggreFlow1" doc:name="fileaggreFlow1" processingStrategy="synchronous">
<vm:inbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
<logger level="INFO" doc:name="Logger"/>
<processor-chain doc:name="Processor Chain">
<collection-aggregator timeout="1000000" failOnTimeout="true" storePrefix="#[MULE_CORRELATION_ID]" doc:name="Collection Aggregator"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<foreach doc:name="For Each">
<logger message="Processing: #[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
<file:outbound-endpoint path="C:\TestFile" outputPattern="#[message.inboundProperties.originalFilename.substring(5,17)]" responseTimeout="10000" connector-ref="File1" doc:name="Destination"/>
</foreach>
</processor-chain>
</flow>
如果您能post完整的流程,将会有所帮助。我的文件名是 File120150107.txt(依此类推...)
我认为你的问题只是因为你设置了 failOnTimeout = 'false'。设为 'True'
<collection-aggregator timeout="86400000" failOnTimeout="true" doc:name="Collection Aggregator">
.
它将等待文件(2 或 3 个文件,根据您的要求),直到达到特定时间(此处为 86400000)。一旦超过,就会失败。
以您的情况为例(FailOnTime= 'False')。如果您尝试发送 4 个文件。如果只收到 2 个文件,则在该时间内。不完整的文件将被处理(它不会等待剩余的 2 个文件)。
尝试检查您计划处理多少个文件,以及处理需要多长时间(example:4 个文件),相应地调整时间。