RequestHandlerRetryAdvice 无法与 Spring 集成中的 Ftp.outboundGateway 一起使用
RequestHandlerRetryAdvice cannot be made to work with Ftp.outboundGateway in Spring Integration
我的情况与中描述的情况相似。不同之处在于我不使用 WebFlux.outboundGateway
而是使用 Ftp.outboundGateway
我调用 AbstractRemoteFileOutboundGateway.Command.GET
命令,常见的问题是我无法获得定义的 RequestHandlerRetryAdvice
待用。
配置看起来像这样(剥离成相关部分):
@RestController
@RequestMapping( value = "/somepath" )
public class DownloadController
{
private DownloadGateway downloadGateway;
public DownloadController( DownloadGateway downloadGateway )
{
this.downloadGateway = downloadGateway;
}
@PostMapping( "/downloads" )
public void download( @RequestParam( "filename" ) String filename )
{
Map<String, Object> headers = new HashMap<>();
downloadGateway.triggerDownload( filename, headers );
}
}
@MessagingGateway
public interface DownloadGateway
{
@Gateway( requestChannel = "downloadFiles.input" )
void triggerDownload( Object value, Map<String, Object> headers );
}
@Configuration
@EnableIntegration
public class FtpDefinition
{
private FtpProperties ftpProperties;
public FtpDefinition( FtpProperties ftpProperties )
{
this.ftpProperties = ftpProperties;
}
@Bean
public DirectChannel gatewayDownloadsOutputChannel()
{
return new DirectChannel();
}
@Bean
public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile )
{
return f -> f.handle( getRemoteFile, getRetryAdvice() )
.channel( "gatewayDownloadsOutputChannel" );
}
private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice()
{
return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRetryTemplate( getRetryTemplate() );
return advice;
} ).get() );
}
private RetryTemplate getRetryTemplate()
{
RetryTemplate result = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod( 5000 );
result.setBackOffPolicy( backOffPolicy );
return result;
}
@Bean
public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory )
{
return
Ftp.outboundGateway( sessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET,
"payload" )
.fileExistsMode( FileExistsMode.REPLACE )
.localDirectoryExpression( "'" + ftpProperties.getLocalDir() + "'" )
.autoCreateLocalDirectory( true );
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory()
{
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() );
sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() );
sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() );
sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() );
return sessionFactory;
}
}
@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class FtpTestApplication {
public static void main(String[] args) {
SpringApplication.run( FtpTestApplication.class, args );
}
}
@Configuration
@PropertySource( "classpath:ftp.properties" )
@ConfigurationProperties( prefix = "ftp" )
@Data
public class FtpProperties
{
@NotNull
private String localDir;
@NotNull
private List<Server> servers;
@Data
public static class Server
{
@NotNull
private String host;
@NotNull
private int port;
@NotNull
private String user;
@NotNull
private String password;
}
}
控制器主要用于测试目的,在实际实现中有一个轮询器。我的 FtpProperties
包含一个服务器列表,因为在实际实现中,我使用 DelegatingSessionFactory
根据一些参数选择一个实例。
根据 ,我希望重试失败的下载。但是如果我中断下载服务器端(通过在 FileZilla 实例中发出 "Kick user"),我只会立即获得堆栈跟踪并且不会重试:
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.
[...]
我还需要上传文件,为此我使用 Ftp.outboundAdapter
。在这种情况下,使用相同的 RetryTemplate
,如果我中断上传服务器端,Spring 集成会再尝试两次,每次延迟 5 秒,然后才记录 java.net.SocketException: Connection reset
,所有不出所料。
我试着调试了一下,发现就在第一次尝试通过 Ftp.outboundAdapter
上传之前,RequestHandlerRetryAdvice.doInvoke()
上的断点被命中。但是当通过 Ftp.outboundGateway
下载时,断点是 never hit.
我的配置有问题吗,有人可以让 RequestHandlerRetryAdvice
与 Ftp.outboundGateway
/AbstractRemoteFileOutboundGateway.Command.GET
一起工作吗?
抱歉耽搁了;本周我们在 SpringOne 平台。
问题是由于网关规范是一个 bean - 在应用建议之前网关最终被初始化。
我这样修改了你的代码...
@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice())
.channel("gatewayDownloadsOutputChannel");
}
...
private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) {
return Ftp.outboundGateway(sessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET,
"payload")
.fileExistsMode(FileExistsMode.REPLACE)
.localDirectoryExpression("'/tmp'")
.autoCreateLocalDirectory(true);
}
...成功了。
通常最好不要直接处理规范,而只是将它们内联在流定义中...
@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
return f -> f.handle(Ftp.outboundGateway(sessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET,
"payload")
.fileExistsMode(FileExistsMode.REPLACE)
.localDirectoryExpression("'/tmp'")
.autoCreateLocalDirectory(true), getRetryAdvice())
.channel("gatewayDownloadsOutputChannel");
}
我的情况与WebFlux.outboundGateway
而是使用 Ftp.outboundGateway
我调用 AbstractRemoteFileOutboundGateway.Command.GET
命令,常见的问题是我无法获得定义的 RequestHandlerRetryAdvice
待用。
配置看起来像这样(剥离成相关部分):
@RestController
@RequestMapping( value = "/somepath" )
public class DownloadController
{
private DownloadGateway downloadGateway;
public DownloadController( DownloadGateway downloadGateway )
{
this.downloadGateway = downloadGateway;
}
@PostMapping( "/downloads" )
public void download( @RequestParam( "filename" ) String filename )
{
Map<String, Object> headers = new HashMap<>();
downloadGateway.triggerDownload( filename, headers );
}
}
@MessagingGateway
public interface DownloadGateway
{
@Gateway( requestChannel = "downloadFiles.input" )
void triggerDownload( Object value, Map<String, Object> headers );
}
@Configuration
@EnableIntegration
public class FtpDefinition
{
private FtpProperties ftpProperties;
public FtpDefinition( FtpProperties ftpProperties )
{
this.ftpProperties = ftpProperties;
}
@Bean
public DirectChannel gatewayDownloadsOutputChannel()
{
return new DirectChannel();
}
@Bean
public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile )
{
return f -> f.handle( getRemoteFile, getRetryAdvice() )
.channel( "gatewayDownloadsOutputChannel" );
}
private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice()
{
return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRetryTemplate( getRetryTemplate() );
return advice;
} ).get() );
}
private RetryTemplate getRetryTemplate()
{
RetryTemplate result = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod( 5000 );
result.setBackOffPolicy( backOffPolicy );
return result;
}
@Bean
public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory )
{
return
Ftp.outboundGateway( sessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET,
"payload" )
.fileExistsMode( FileExistsMode.REPLACE )
.localDirectoryExpression( "'" + ftpProperties.getLocalDir() + "'" )
.autoCreateLocalDirectory( true );
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory()
{
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() );
sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() );
sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() );
sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() );
return sessionFactory;
}
}
@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class FtpTestApplication {
public static void main(String[] args) {
SpringApplication.run( FtpTestApplication.class, args );
}
}
@Configuration
@PropertySource( "classpath:ftp.properties" )
@ConfigurationProperties( prefix = "ftp" )
@Data
public class FtpProperties
{
@NotNull
private String localDir;
@NotNull
private List<Server> servers;
@Data
public static class Server
{
@NotNull
private String host;
@NotNull
private int port;
@NotNull
private String user;
@NotNull
private String password;
}
}
控制器主要用于测试目的,在实际实现中有一个轮询器。我的 FtpProperties
包含一个服务器列表,因为在实际实现中,我使用 DelegatingSessionFactory
根据一些参数选择一个实例。
根据
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.
[...]
我还需要上传文件,为此我使用 Ftp.outboundAdapter
。在这种情况下,使用相同的 RetryTemplate
,如果我中断上传服务器端,Spring 集成会再尝试两次,每次延迟 5 秒,然后才记录 java.net.SocketException: Connection reset
,所有不出所料。
我试着调试了一下,发现就在第一次尝试通过 Ftp.outboundAdapter
上传之前,RequestHandlerRetryAdvice.doInvoke()
上的断点被命中。但是当通过 Ftp.outboundGateway
下载时,断点是 never hit.
我的配置有问题吗,有人可以让 RequestHandlerRetryAdvice
与 Ftp.outboundGateway
/AbstractRemoteFileOutboundGateway.Command.GET
一起工作吗?
抱歉耽搁了;本周我们在 SpringOne 平台。
问题是由于网关规范是一个 bean - 在应用建议之前网关最终被初始化。
我这样修改了你的代码...
@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice())
.channel("gatewayDownloadsOutputChannel");
}
...
private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) {
return Ftp.outboundGateway(sessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET,
"payload")
.fileExistsMode(FileExistsMode.REPLACE)
.localDirectoryExpression("'/tmp'")
.autoCreateLocalDirectory(true);
}
...成功了。
通常最好不要直接处理规范,而只是将它们内联在流定义中...
@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
return f -> f.handle(Ftp.outboundGateway(sessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET,
"payload")
.fileExistsMode(FileExistsMode.REPLACE)
.localDirectoryExpression("'/tmp'")
.autoCreateLocalDirectory(true), getRetryAdvice())
.channel("gatewayDownloadsOutputChannel");
}