我如何通过 java 1.7 中的 Spring DSL 在 Sftp 入站适配器中动态传输消息

How could I transfer message dynamically in Sftp Inbound adapter through Spring DSL in java 1.7

我有一个 Sftp 入站流,我从 DefaultSftpSessionFactory 获得了会话信息。但是我需要动态地实现多个会话信息,我将从数据库 table 中获取这些信息。这意味着我需要在集成流程中实施多个 Sftp 服务器详细信息。现在我已经完成了从单个源到单个目标的文件传输,但我需要实现多个源到多个目标。那么任何人都可以对此提供一些指导。

这是我的会话工厂...这里我有单个 Sftp 服务器信息,但如何配置多个服务器详细信息。

    @Autowired
    private DefaultSftpSessionFactory sftpSessionFactory;

    @Bean
    public DefaultSftpSessionFactory sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(
                true);
        factory.setHost("111.11.12.143");
        factory.setPort(22);
        factory.setUser("sftp");
        factory.setPassword("*******");         
        return factory;
    }

这是我的 Sftp 入站流量..

    @Bean
    public IntegrationFlow sftpInboundFlow() {
    System.out.println("enter sftpInboundFlow....."
            + sftpSessionFactory.getSession());     
    return IntegrationFlows
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true).remoteDirectory(remDir)
                    .regexFilter(".*\.txt$")
                    .localFilenameExpression("#this.toUpperCase()")
                    .localDirectory(new File(localDir))
                    .remoteFileSeparator("/"),
                    new Consumer<SourcePollingChannelAdapterSpec>() {
                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.id("sftpInboundAdapter")
                                    .autoStartup(true)
                                    .poller(Pollers.fixedRate(1000)
                                            .maxMessagesPerPoll(1));
                        }
                    })
            //.channel(MessageChannels.queue("sftpInboundResultChannel"))
                    .channel(sftpInboundResultChannel())
            .get();
}

按照 Gary 的建议,我正在编辑我的 post.....

嗨,加里, 我正在参考 Github dynamic FTP example.

通过 ChannelResolver class 我需要调用我上面的 DSL class。并在不使用 XML.

的情况下在上下文 属性 中设置动态值

在我的 ChannelResolver class 我想要这样的东西

StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
props.setProperty("inbound.host", host);    //I am getting the value of 'host' from a DB table.
PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
env.getPropertySources().addLast(pps);
context.setEnvironment(env); 

And my DSL class I need to use like this.
@Value("${inbound.host}")
private String host;

So in this way can I set dynamic value for String 'host' ? 

我正在编辑我的原创post......

In my Outbound dynamic resolver class I am doing like this


    StandardEnvironment env = new StandardEnvironment();
    Properties props = new Properties();        
    props.setProperty("outbound.host", host);
    props.setProperty("outbound.port", String.valueOf(port));
    props.setProperty("outbound.user", user);
    props.setProperty("outbound.password", password);
    props.setProperty("outbound.remote.directory", remoteDir);
    props.setProperty("outbound.local.directory", localDir);        

    PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
    env.getPropertySources().addLast(pps);
    ctx.setEnvironment(env);


And this is my dsl class....

@Autowired
private DefaultSftpSessionFactory sftpSessionFactory;

@Bean
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${outbound.host}") String host, @Value("${outbound.port}") int port,
        @Value("${outbound.user}") String user, @Value("${outbound.password}") String password
        ) {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(port);
    factory.setUser(user);
    factory.setPassword(password);      
    return factory;
}


@Bean
public IntegrationFlow fileInboundFlow(@Value("${outbound.local.directory}") String localDir)
{
    return IntegrationFlows
            .from(Files.inboundAdapter(new File(localDir)),
                    new Consumer<SourcePollingChannelAdapterSpec>() {

                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.autoStartup(true).poller(
                                    Pollers.fixedDelay(5000)
                                            .maxMessagesPerPoll(1));
                        }
                    })
                    .channel(sftpSendChannel())
                    .get();
}

@Bean
public IntegrationFlow sftpOutboundFlow(@Value("${outbound.remote.directory}") String remDir) {    
    return IntegrationFlows
            .from(sftpSendChannel())
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory)                       
                    .useTemporaryFileName(false)
                    .remoteDirectory(remDir))
                    .get();
}

@Bean
public MessageChannel sftpSendChannel() {
    return new DirectChannel();
}

@Bean
public static PropertySourcesPlaceholderConfigurer configurer1() {
    return new PropertySourcesPlaceholderConfigurer();      
}


And this the error log from console...

2015 年 8 月 3 日7:50:25下午org.apache.catalina.core.StandardContextlistenerStart 严重:向 class org.springframework.web.context.ContextLoaderListener 的侦听器实例发送上下文初始化事件的异常 org.springframework.beans.factory.BeanCreationException:创建名称为 'sftpOutBoundDsl' 的 bean 时出错:注入自动装配的依赖项失败;嵌套异常是 org.springframework.beans.factory.BeanCreationException:无法自动装配字段:private org.springframework.integration.sftp.session.DefaultSftpSessionFactory com.tcs.iux.ieg.sftp.dynamic.SftpOutBoundDsl.sftpSessionFactory;嵌套异常是 java.lang.IllegalArgumentException:无法解析字符串值“${outbound.host}”中的占位符 'outbound.host' 在 org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:334) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1204) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:538) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476) 在 org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302) 在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:229) 在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298) 在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:725) 在 org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:757) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480) 在 org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:403) 在 org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:306) 在 org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:106) 在 org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4973) 在 org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5467) 在 org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) 在 org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559) 在 org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549) 在 java.util.concurrent.FutureTask.run(FutureTask.java:262) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:744) Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.integration.sftp.session.DefaultSftpSessionFactory com.tcs.iux.ieg.sftp.dynamic.SftpOutBoundDsl.sftpSessionFactory;嵌套异常是 java.lang.IllegalArgumentException:无法解析字符串值“${outbound.host}”中的占位符 'outbound.host' 在 org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:555) 在 org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87) 在 org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331) ... 还有 22 个 原因:java.lang.IllegalArgumentException:无法解析字符串值“${outbound.host}”中的占位符 'outbound.host' 在 org.springframework.util.PropertyPlaceholderHelper.parseStringValue(PropertyPlaceholderHelper.java:174) 在 org.springframework.util.PropertyPlaceholderHelper.replacePlaceholders(PropertyPlaceholderHelper.java:126) 在 org.springframework.core.env.AbstractPropertyResolver.doResolvePlaceholders(AbstractPropertyResolver.java:204) 在 org.springframework.core.env.AbstractPropertyResolver.resolveRequiredPlaceholders(AbstractPropertyResolver.java:178) 在 org.springframework.context.support.PropertySourcesPlaceholderConfigurer$2.resolveStringValue(PropertySourcesPlaceholderConfigurer.java:175) 在 org.springframework.beans.factory.support.AbstractBeanFactory.resolveEmbeddedValue(AbstractBeanFactory.java:800) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:917) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:904) 在 org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:815) 在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:743) 在 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:466) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1113) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean 实例 (AbstractAutowireCapableBeanFactory.java:1008) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:505) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476) 在 org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302) 在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:229) 在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298) 在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidates(DefaultListableBeanFactory.java:1088) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1006) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:904) 在 org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:527) ... 还有 24 个

目前不支持。

我们有一个 open JIRA to add support for dynamic server selection,但不太可能在即将发布的 4.2 版本中及时完成。

您可以通过编写自己的自定义委托会话工厂来解决这个问题,该工厂使用一些标准(例如 ThreadLocal)来确定要使用哪个委托工厂。

编辑:

与 XML 一样,您需要一个 PropertySourcesPlaceholderConfigurer bean。

您还应该使用工厂方法注入,因为 @Configuration class 创建得太早,无法注入 @Value...

@Configuration
public class FooConfig {

    @Bean
    public DefaultSftpSessionFactory factory(
            @Value("${inbound.host}") String host, 
            @Value("${inbound.port}") int port) {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setPort(port);
        return sf;
    }

    @Bean
    public PropertySourcesPlaceholderConfigurer configurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

.

public class Testing {

    @Test
    public void test() {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(FooConfig.class);
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        props.setProperty("inbound.host", "bar");
        props.setProperty("inbound.port", "23");
        PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
        env.getPropertySources().addLast(pps);
        context.setEnvironment(env);
        context.refresh();
        DefaultSftpSessionFactory sessionFactory = context.getBean(DefaultSftpSessionFactory.class);
        assertEquals("bar", TestUtils.getPropertyValue(sessionFactory, "host"));
        context.close();
    }

}

顺便说一句,the delegating session factory will be in 4.2 after all

EDIT2:

你可以避免配置 class 的早期实例化并使用全局 @Value 注入,只要你制作 PSPC bean static...

@Configuration
public class FooConfig {

    @Value("${foo}")
    public String foo;

    @Bean
    public String earlyFoo() {
        return this.foo;
    }

    @Bean
    public String foo(@Value("${foo}") String foo) {
        return foo;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer configurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

在这种情况下,earlyFoo 按预期填充。