出站网关执行 MV 命令非常慢

outbound-gateway executes the MV command very slowly

我们将Spring集成的版本从4.2.13升级到5.3.1后,SFTP Outbound Gateway经常会执行MV命令超过30秒。 我们使用 inbound-stream-channel-adapter 获取文件,然后使用 outbound-gateway 将其移动到备份文件夹,下面是我们的 xml 代码片段

<int:channel id="input">
    <int:queue />
</int:channel>
<int:channel id="output">
    <int:queue />
    <int:interceptors>
        <int:wire-tap channel="successHistory"/>
    </int:interceptors>
</int:channel>
<int-sftp:inbound-streaming-channel-adapter id="sftInboundAdapter"
                                            session-factory="cachingSftpSessionFactory"
                                            channel="input"
                                            remote-file-separator="/"
                                            remote-directory="/home/box">
    <int:poller fixed-delay="2000" max-messages-per-poll="1"/>
</int-sftp:inbound-streaming-channel-adapter>

<int:chain id="chain1" input-channel=" input" output-channel="output">
    <int:poller fixed-delay="1000"/>
    <int:stream-transformer charset="UTF-8"/>
    <int:header-enricher>
        <int:error-channel ref="error" overwrite="true"/>
        <int:header name="originalPayload" expression="payload"/>
    </int:header-enricher>
    <int-sftp:outbound-gateway session-factory="cachingSftpSessionFactory"
                               id="sftpOutboundGateway"
                               command="mv"
                               expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"
                               rename-expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"
    >
            <int-sftp:request-handler-advice-chain>
                <ref bean="gatewayLogger"/>
            </int-sftp:request-handler-advice-chain>
        </int-sftp:outbound-gateway>
    <int:transformer expression="headers.originalPayload"/>
</int:chain>
<jms:outbound-channel-adapter channel="output" connection-factory="tibcoEmsConnectionFactory" destination="topic"/>

<bean id="sftpSessionFactory"
      class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="${sftp.host}"/>
    <property name="port" value="${sftp.port}"/>
    <property name="user" value="${sftp.user}"/>
    <property name="password" value="${sftp.password}"/>
    <property name="allowUnknownKeys" value="true"/>
    <property name="timeout" value="300000"/>
</bean>

<bean id="cachingSftpSessionFactory"
      class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <constructor-arg value="2"/>
    <property name="sessionWaitTimeout" value="300000"/>
</bean>

Gateway Advice生成的日志如下,rename(MV)操作耗时30多秒

2020-07-07 12:20:16 INFO  [task-scheduler-8] gatewayLogger - ''int-sftp:outbound-gateway' with id='sftpOutboundGateway''@1346093219 - before:  {file_remoteHostPort=0.0.0.0, fileName=20200707115747609.xml, errorChannel=bean 'error', file_remoteDirectory=/home/box, originalPayload=<?xml version="1.0" encoding="UTF-8"?>

2020-07-07 12:20:48 INFO  [task-scheduler-8] gatewayLogger - ''int-sftp:outbound-gateway' with id='sftpOutboundGateway''@1346093219 - after:  org.springframework.integration.support.MessageBuilder@153944c0

由于我们使用链来处理消息,session会通过Stream transformer释放,如果gateway运行时间过长,消息会在queue中pend,相应的session无法释放,会导致message卡住,适配器将用完缓存中的所有会话。

我认为问题在于您如何使用 CachingSessionFactory。您的 <constructor-arg value="2"/> 缓存太低,因此缓存会话很有可能出现竞争条件。

您在 <int-sftp:inbound-streaming-channel-adapter> 中使用此会话工厂,它会打开一个会话并将其保留在缓存之外,直到 <int:stream-transformer>。但这已经在另一个线程上发生了,因为你的 input 频道是 QueueChannel。通过这种方式,您可以让 <int-sftp:inbound-streaming-channel-adapter> 的线程运行,并且该线程能够从缓存中获取新会话(如果有的话)。所以,当 <int-sftp:outbound-gateway> 轮到时,缓存中可能没有会话需要处理。

请解释一下,为什么你的缓存这么低,为什么你在入站轮询通道适配器之后使用 QueueChannel?不相关,但为什么您也将 QueueChannel 用于 output 目的地?

我认为 SpringIntegration-5.3.1 在 int-sftp:outbound-gateway 中有一个错误,因为我们可以很容易地重现 sftp 网关在某些机器上长时间执行 mv 命令(我们的产品)

但是我们用自己的activator替换gateway后,mv命令执行的非常非常快

我们替换了:

<int-sftp:outbound-gateway session-factory="cachingSftpSessionFactory"
                               id="sftpOutboundGateway"
                               command="mv"
                               expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"
                               rename-expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"
    >

与:

  <int:header-enricher>
            <int:header name="PATH_FROM" expression="headers.file_remoteDirectory+'/'+headers.file_remoteFile"/>
            <int:header name="PATH_TO" expression="headers.file_remoteDirectory+'/backup/'+headers.file_remoteFile"/>
        </int:header-enricher>
        <int:service-activator ref="remoteFileRenameActivator"/>

这里是我们的 remoteFileRenameActivator 的源代码

@ServiceActivator
    public Message moveFile(Message message, @Header("PATH_FROM") String pathFrom, @Header("PATH_TO") String pathTo) throws IOException {
        try (Session session = sessionFactory.getSession();) {
            LOGGER.debug(contextName + " " + session.toString() + " is moving file from " + pathFrom + " to " + pathTo);
            session.rename(pathFrom, pathTo);
        }
        return message;
    }

我们认为这是一个错误的原因是:

  1. 我们将 Spring 集成从 4.2.13 升级到 5.3.1,我们没有见面 4.2.13中的此类问题
  2. 我们将网关的 mv 命令替换为 我们自己实现的mv命令,mv命令执行不是 瓶颈了。
  3. 修改后问题依旧 QueueChannel 到 DirectChannel 并增加会话数量。
  4. 我们运行在客户端用命令行重命名命令,也很快

由org.springframework.integration.file.remote.RemoteFileUtils#makeDirectories引起,是synchronized静态方法,当有大量(S)ftp move操作并发且网速慢时,AbstractRemoteFileOutboundGateway#mv的所有请求排队并观察到速度非常慢。

方法签名如下:

public static synchronized <F> void makeDirectories(String path, Session<F> session, String remoteFileSeparator,
        Log logger) throws IOException {