Spring 集成中的 REST 端点使消息传递通道成为多线程
REST endpoints in Spring Integration make messaging channels multithreaded
我有一个非常简单的 Spring 引导应用程序,它提供了几个 restful 端点,这应该驱动一个 sftp 文件上传到 sftp 服务器。我的要求是,如果有多个文件,则文件应该排队。我希望通过 sftp spring 集成工作流的默认行为来实现这一点,因为我读到 DirectChannel 会自动对文件进行排队。为了测试我执行以下操作的行为:
- 发送一个大文件,通过调用端点阻塞通道一段时间。
- 通过调用端点发送较小的文件。
预期结果:小文件排队到通道,待大文件上传完成后处理。
实际结果:打开一个新的sftp服务器连接,较小的文件上传到那里,没有排队,而较大的文件继续传输。
我的应用程序中有两个文件:
DemoApplication.java
@SpringBootApplication
@IntegrationComponentScan
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("tester");
factory.setPassword("password");
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
DemoController.java
@RestController
public class DemoController {
@Autowired
MyGateway gateway;
@RequestMapping("/sendFile")
public void sendFile() {
File file = new File("C:/smallFile.txt");
gateway.sendToSftp(file);
}
@RequestMapping("/sendBigFile")
public void sendBigFile() {
File file = new File("D:/bigFile.zip");
gateway.sendToSftp(file);
}
}
我是 spring 的新手,我不太确定我的 sftp 通道是否在这里正确创建,我的猜测是每次调用 sendToSftp 时都会创建一个新通道.在这种情况下如何实现队列行为的任何帮助将不胜感激。
这里没有 queue,因为每个 HTTP 请求都是在其自己的线程中执行的。是的,当 http 线程池耗尽时,您可能 queue 仍然在那里,但是在您只有两个请求的简单 use-case 中看起来并不如此。
无论如何,您都可以在那里实现 queue 行为,但是您应该将 toSftpChannel
声明为 QueueChannel
bean。
这样下游进程将始终在同一个线程上执行,下一条消息将在第一个消息之后从 queue 中拉出。
有关详细信息,请参阅 Reference Manual。
更新
因为你使用了 FtpMessageHandler
组件,它是 one-way 组件,但是你仍然需要对 MVC 控制器的方法进行一些回复,唯一的方法就是使用 @Gateway
方法使用非 void
return 当然我们需要以某种方式发送回复。
为此,我建议使用 PublishSubscribeChannel
:
@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
这样我们就有了 toSftpChannel
的两个订阅者。使用 @Order(0)
我们确保 @ServiceActivator
是第一个订阅者,因为我们需要首先执行 SFTP 传输。使用 @BridgeTo
我们将第二个 BridgeHandler
添加到相同的 PublishSubscribeChannel
。它的目的只是得到一个replyChannel
header 并在那里发送请求消息。由于我们不使用任何线程,BridgeHandler
将在传输到 SFTP 完成后执行。
当然,除了 BridgeHandler
,您还可以将任何其他 @ServiceActivator
或 @Transfromer
作为对 return 的回复,而不是请求 File
,而是任何内容别的。例如:
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(File payload) {
return "The SFTP transfer complete for file: " + payload;
}
我有一个非常简单的 Spring 引导应用程序,它提供了几个 restful 端点,这应该驱动一个 sftp 文件上传到 sftp 服务器。我的要求是,如果有多个文件,则文件应该排队。我希望通过 sftp spring 集成工作流的默认行为来实现这一点,因为我读到 DirectChannel 会自动对文件进行排队。为了测试我执行以下操作的行为:
- 发送一个大文件,通过调用端点阻塞通道一段时间。
- 通过调用端点发送较小的文件。
预期结果:小文件排队到通道,待大文件上传完成后处理。 实际结果:打开一个新的sftp服务器连接,较小的文件上传到那里,没有排队,而较大的文件继续传输。
我的应用程序中有两个文件:
DemoApplication.java
@SpringBootApplication
@IntegrationComponentScan
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("tester");
factory.setPassword("password");
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
DemoController.java
@RestController
public class DemoController {
@Autowired
MyGateway gateway;
@RequestMapping("/sendFile")
public void sendFile() {
File file = new File("C:/smallFile.txt");
gateway.sendToSftp(file);
}
@RequestMapping("/sendBigFile")
public void sendBigFile() {
File file = new File("D:/bigFile.zip");
gateway.sendToSftp(file);
}
}
我是 spring 的新手,我不太确定我的 sftp 通道是否在这里正确创建,我的猜测是每次调用 sendToSftp 时都会创建一个新通道.在这种情况下如何实现队列行为的任何帮助将不胜感激。
这里没有 queue,因为每个 HTTP 请求都是在其自己的线程中执行的。是的,当 http 线程池耗尽时,您可能 queue 仍然在那里,但是在您只有两个请求的简单 use-case 中看起来并不如此。
无论如何,您都可以在那里实现 queue 行为,但是您应该将 toSftpChannel
声明为 QueueChannel
bean。
这样下游进程将始终在同一个线程上执行,下一条消息将在第一个消息之后从 queue 中拉出。
有关详细信息,请参阅 Reference Manual。
更新
因为你使用了 FtpMessageHandler
组件,它是 one-way 组件,但是你仍然需要对 MVC 控制器的方法进行一些回复,唯一的方法就是使用 @Gateway
方法使用非 void
return 当然我们需要以某种方式发送回复。
为此,我建议使用 PublishSubscribeChannel
:
@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
这样我们就有了 toSftpChannel
的两个订阅者。使用 @Order(0)
我们确保 @ServiceActivator
是第一个订阅者,因为我们需要首先执行 SFTP 传输。使用 @BridgeTo
我们将第二个 BridgeHandler
添加到相同的 PublishSubscribeChannel
。它的目的只是得到一个replyChannel
header 并在那里发送请求消息。由于我们不使用任何线程,BridgeHandler
将在传输到 SFTP 完成后执行。
当然,除了 BridgeHandler
,您还可以将任何其他 @ServiceActivator
或 @Transfromer
作为对 return 的回复,而不是请求 File
,而是任何内容别的。例如:
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(File payload) {
return "The SFTP transfer complete for file: " + payload;
}