出站网关执行 MV 命令非常慢
outbound-gateway executes the MV command very slowly
我们将Spring集成的版本从4.2.13升级到5.3.1后,SFTP Outbound Gateway经常会执行MV命令超过30秒。
我们使用 inbound-stream-channel-adapter 获取文件,然后使用 outbound-gateway 将其移动到备份文件夹,下面是我们的 xml 代码片段
<int:channel id="input">
<int:queue />
</int:channel>
<int:channel id="output">
<int:queue />
<int:interceptors>
<int:wire-tap channel="successHistory"/>
</int:interceptors>
</int:channel>
<int-sftp:inbound-streaming-channel-adapter id="sftInboundAdapter"
session-factory="cachingSftpSessionFactory"
channel="input"
remote-file-separator="/"
remote-directory="/home/box">
<int:poller fixed-delay="2000" max-messages-per-poll="1"/>
</int-sftp:inbound-streaming-channel-adapter>
<int:chain id="chain1" input-channel=" input" output-channel="output">
<int:poller fixed-delay="1000"/>
<int:stream-transformer charset="UTF-8"/>
<int:header-enricher>
<int:error-channel ref="error" overwrite="true"/>
<int:header name="originalPayload" expression="payload"/>
</int:header-enricher>
<int-sftp:outbound-gateway session-factory="cachingSftpSessionFactory"
id="sftpOutboundGateway"
command="mv"
expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"
rename-expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"
>
<int-sftp:request-handler-advice-chain>
<ref bean="gatewayLogger"/>
</int-sftp:request-handler-advice-chain>
</int-sftp:outbound-gateway>
<int:transformer expression="headers.originalPayload"/>
</int:chain>
<jms:outbound-channel-adapter channel="output" connection-factory="tibcoEmsConnectionFactory" destination="topic"/>
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${sftp.host}"/>
<property name="port" value="${sftp.port}"/>
<property name="user" value="${sftp.user}"/>
<property name="password" value="${sftp.password}"/>
<property name="allowUnknownKeys" value="true"/>
<property name="timeout" value="300000"/>
</bean>
<bean id="cachingSftpSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="2"/>
<property name="sessionWaitTimeout" value="300000"/>
</bean>
Gateway Advice生成的日志如下,rename(MV)操作耗时30多秒
2020-07-07 12:20:16 INFO [task-scheduler-8] gatewayLogger - ''int-sftp:outbound-gateway' with id='sftpOutboundGateway''@1346093219 - before: {file_remoteHostPort=0.0.0.0, fileName=20200707115747609.xml, errorChannel=bean 'error', file_remoteDirectory=/home/box, originalPayload=<?xml version="1.0" encoding="UTF-8"?>
2020-07-07 12:20:48 INFO [task-scheduler-8] gatewayLogger - ''int-sftp:outbound-gateway' with id='sftpOutboundGateway''@1346093219 - after: org.springframework.integration.support.MessageBuilder@153944c0
由于我们使用链来处理消息,session会通过Stream transformer释放,如果gateway运行时间过长,消息会在queue中pend,相应的session无法释放,会导致message卡住,适配器将用完缓存中的所有会话。
我认为问题在于您如何使用 CachingSessionFactory
。您的 <constructor-arg value="2"/>
缓存太低,因此缓存会话很有可能出现竞争条件。
您在 <int-sftp:inbound-streaming-channel-adapter>
中使用此会话工厂,它会打开一个会话并将其保留在缓存之外,直到 <int:stream-transformer>
。但这已经在另一个线程上发生了,因为你的 input
频道是 QueueChannel
。通过这种方式,您可以让 <int-sftp:inbound-streaming-channel-adapter>
的线程运行,并且该线程能够从缓存中获取新会话(如果有的话)。所以,当 <int-sftp:outbound-gateway>
轮到时,缓存中可能没有会话需要处理。
请解释一下,为什么你的缓存这么低,为什么你在入站轮询通道适配器之后使用 QueueChannel
?不相关,但为什么您也将 QueueChannel
用于 output
目的地?
我认为 SpringIntegration-5.3.1 在 int-sftp:outbound-gateway 中有一个错误,因为我们可以很容易地重现 sftp 网关在某些机器上长时间执行 mv 命令(我们的产品)
但是我们用自己的activator替换gateway后,mv命令执行的非常非常快
我们替换了:
<int-sftp:outbound-gateway session-factory="cachingSftpSessionFactory"
id="sftpOutboundGateway"
command="mv"
expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"
rename-expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"
>
与:
<int:header-enricher>
<int:header name="PATH_FROM" expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"/>
<int:header name="PATH_TO" expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"/>
</int:header-enricher>
<int:service-activator ref="remoteFileRenameActivator"/>
这里是我们的 remoteFileRenameActivator 的源代码
@ServiceActivator
public Message moveFile(Message message, @Header("PATH_FROM") String pathFrom, @Header("PATH_TO") String pathTo) throws IOException {
try (Session session = sessionFactory.getSession();) {
LOGGER.debug(contextName + " " + session.toString() + " is moving file from " + pathFrom + " to " + pathTo);
session.rename(pathFrom, pathTo);
}
return message;
}
我们认为这是一个错误的原因是:
- 我们将 Spring 集成从 4.2.13 升级到 5.3.1,我们没有见面
4.2.13中的此类问题
- 我们将网关的 mv 命令替换为
我们自己实现的mv命令,mv命令执行不是
瓶颈了。
- 修改后问题依旧
QueueChannel 到 DirectChannel 并增加会话数量。
- 我们运行在客户端用命令行重命名命令,也很快
由org.springframework.integration.file.remote.RemoteFileUtils#makeDirectories引起,是synchronized静态方法,当有大量(S)ftp move操作并发且网速慢时,AbstractRemoteFileOutboundGateway#mv的所有请求排队并观察到速度非常慢。
方法签名如下:
public static synchronized <F> void makeDirectories(String path, Session<F> session, String remoteFileSeparator,
Log logger) throws IOException {
我们将Spring集成的版本从4.2.13升级到5.3.1后,SFTP Outbound Gateway经常会执行MV命令超过30秒。 我们使用 inbound-stream-channel-adapter 获取文件,然后使用 outbound-gateway 将其移动到备份文件夹,下面是我们的 xml 代码片段
<int:channel id="input">
<int:queue />
</int:channel>
<int:channel id="output">
<int:queue />
<int:interceptors>
<int:wire-tap channel="successHistory"/>
</int:interceptors>
</int:channel>
<int-sftp:inbound-streaming-channel-adapter id="sftInboundAdapter"
session-factory="cachingSftpSessionFactory"
channel="input"
remote-file-separator="/"
remote-directory="/home/box">
<int:poller fixed-delay="2000" max-messages-per-poll="1"/>
</int-sftp:inbound-streaming-channel-adapter>
<int:chain id="chain1" input-channel=" input" output-channel="output">
<int:poller fixed-delay="1000"/>
<int:stream-transformer charset="UTF-8"/>
<int:header-enricher>
<int:error-channel ref="error" overwrite="true"/>
<int:header name="originalPayload" expression="payload"/>
</int:header-enricher>
<int-sftp:outbound-gateway session-factory="cachingSftpSessionFactory"
id="sftpOutboundGateway"
command="mv"
expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"
rename-expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"
>
<int-sftp:request-handler-advice-chain>
<ref bean="gatewayLogger"/>
</int-sftp:request-handler-advice-chain>
</int-sftp:outbound-gateway>
<int:transformer expression="headers.originalPayload"/>
</int:chain>
<jms:outbound-channel-adapter channel="output" connection-factory="tibcoEmsConnectionFactory" destination="topic"/>
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${sftp.host}"/>
<property name="port" value="${sftp.port}"/>
<property name="user" value="${sftp.user}"/>
<property name="password" value="${sftp.password}"/>
<property name="allowUnknownKeys" value="true"/>
<property name="timeout" value="300000"/>
</bean>
<bean id="cachingSftpSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="2"/>
<property name="sessionWaitTimeout" value="300000"/>
</bean>
Gateway Advice生成的日志如下,rename(MV)操作耗时30多秒
2020-07-07 12:20:16 INFO [task-scheduler-8] gatewayLogger - ''int-sftp:outbound-gateway' with id='sftpOutboundGateway''@1346093219 - before: {file_remoteHostPort=0.0.0.0, fileName=20200707115747609.xml, errorChannel=bean 'error', file_remoteDirectory=/home/box, originalPayload=<?xml version="1.0" encoding="UTF-8"?>
2020-07-07 12:20:48 INFO [task-scheduler-8] gatewayLogger - ''int-sftp:outbound-gateway' with id='sftpOutboundGateway''@1346093219 - after: org.springframework.integration.support.MessageBuilder@153944c0
由于我们使用链来处理消息,session会通过Stream transformer释放,如果gateway运行时间过长,消息会在queue中pend,相应的session无法释放,会导致message卡住,适配器将用完缓存中的所有会话。
我认为问题在于您如何使用 CachingSessionFactory
。您的 <constructor-arg value="2"/>
缓存太低,因此缓存会话很有可能出现竞争条件。
您在 <int-sftp:inbound-streaming-channel-adapter>
中使用此会话工厂,它会打开一个会话并将其保留在缓存之外,直到 <int:stream-transformer>
。但这已经在另一个线程上发生了,因为你的 input
频道是 QueueChannel
。通过这种方式,您可以让 <int-sftp:inbound-streaming-channel-adapter>
的线程运行,并且该线程能够从缓存中获取新会话(如果有的话)。所以,当 <int-sftp:outbound-gateway>
轮到时,缓存中可能没有会话需要处理。
请解释一下,为什么你的缓存这么低,为什么你在入站轮询通道适配器之后使用 QueueChannel
?不相关,但为什么您也将 QueueChannel
用于 output
目的地?
我认为 SpringIntegration-5.3.1 在 int-sftp:outbound-gateway 中有一个错误,因为我们可以很容易地重现 sftp 网关在某些机器上长时间执行 mv 命令(我们的产品)
但是我们用自己的activator替换gateway后,mv命令执行的非常非常快
我们替换了:
<int-sftp:outbound-gateway session-factory="cachingSftpSessionFactory"
id="sftpOutboundGateway"
command="mv"
expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"
rename-expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"
>
与:
<int:header-enricher>
<int:header name="PATH_FROM" expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"/>
<int:header name="PATH_TO" expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"/>
</int:header-enricher>
<int:service-activator ref="remoteFileRenameActivator"/>
这里是我们的 remoteFileRenameActivator 的源代码
@ServiceActivator
public Message moveFile(Message message, @Header("PATH_FROM") String pathFrom, @Header("PATH_TO") String pathTo) throws IOException {
try (Session session = sessionFactory.getSession();) {
LOGGER.debug(contextName + " " + session.toString() + " is moving file from " + pathFrom + " to " + pathTo);
session.rename(pathFrom, pathTo);
}
return message;
}
我们认为这是一个错误的原因是:
- 我们将 Spring 集成从 4.2.13 升级到 5.3.1,我们没有见面 4.2.13中的此类问题
- 我们将网关的 mv 命令替换为 我们自己实现的mv命令,mv命令执行不是 瓶颈了。
- 修改后问题依旧 QueueChannel 到 DirectChannel 并增加会话数量。
- 我们运行在客户端用命令行重命名命令,也很快
由org.springframework.integration.file.remote.RemoteFileUtils#makeDirectories引起,是synchronized静态方法,当有大量(S)ftp move操作并发且网速慢时,AbstractRemoteFileOutboundGateway#mv的所有请求排队并观察到速度非常慢。
方法签名如下:
public static synchronized <F> void makeDirectories(String path, Session<F> session, String remoteFileSeparator,
Log logger) throws IOException {