运行 Spring 启动调度程序和 Apache Camel 时出现问题
Problem while running Spring Boot Scheduler and Apache Camel
我正在尝试使用 Spring Boot 和 Apache Camel 文件组件的演示文件传输程序。我已经使用 Spring Boot 编写了一个调度程序,它将每 1 分钟 运行 调用 Apache Camel 路由并进行文件传输。我在目录 C:\CamelDemo\inputFolder 中有三个文件,即 input1.txt、input2.txt 和 input3.txt。我的 Spring 启动调度程序如下:
package com.example.demo.scheduler;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Scheduler {
@Autowired private ProducerTemplate producerTemplate;
@Scheduled(fixedRate=60000)
public void fixedRateSch() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date now = new Date();
String strDate = sdf.format(now);
System.out.println("Fixed Rate scheduler:: " + strDate);
producerTemplate.sendBody("direct:transferFile", null);
System.out.println("Success");
}
}
我的路线如下:
package com.example.demo.route;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class FileTransferRoute extends RouteBuilder {
@SuppressWarnings("deprecation")
@Override
public void configure() {
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliverDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.WARN));
from("direct:transferFile")
.log("Route reached")
.pollEnrich("file:C:\CamelDemo\inputFolder?noop=true")
.to("file:C:\CamelDemo\outputFolder?autoCreate=false")
.end();
}
}
当我运行程序时,调度程序运行ning 3次将三个文件一个一个地传输。在那之后,调度器就不再是 运行ning 了。然后,当我试图通过关闭嵌入式 Tomcal 来停止 Spring 引导应用程序时,它会出现以下错误:
2019-09-11 15:51:14.711 INFO 10408 --- [n(15)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-09-11 15:51:14.714 INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.spring.SpringCamelContext : Apache Camel 2.24.0 (CamelContext: camel-1) is shutting down
2019-09-11 15:51:14.714 INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.impl.DefaultShutdownStrategy : Starting to graceful shutdown 1 routes (timeout 300 seconds)
2019-09-11 15:51:14.717 INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy : Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 1]
2019-09-11 15:51:14.718 INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy : There are 1 inflight exchanges:
InflightExchange: [exchangeId=ID-PCIN467166-1568196927146-0-10, fromRouteId=route1, routeId=route1, nodeId=pollEnrich1, elapsed=0, duration=167106]
所以有以下问题:
1. 我怎样才能使调度程序 运行 永远,以便它会继续轮询文件位置,当文件夹中有新文件时,它会将文件传输到输出目录。
2. 如何正确关闭我想要的 Spring 引导应用程序以及为什么在关闭期间抛出错误?
3. 如何在第一个 运行 本身而不是一个接一个地同时传输所有三个文件?
对于问题 #3:我认为这是不可能的,请参阅 your other related question
中的评论
对于问题 #1 和 #2:了解此行为的关键点是您在 receive[= 中使用 pollEnrich
57=] 模式,因为你没有设置任何超时:请参阅 PollEnrich documentation 中关于超时的详细解释,特别是:
Good practice to use timeout value
=== By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide
a timeout value, to make this clear that we may wait for a message,
until the timeout is hit. ===
回答您的问题:这是您当前路线设置的情况:
- 每隔 60 秒,Spring 调度程序通过向 "direct:transferFile" 通道发送一条带有 "null" 正文的虚拟消息来触发
FileTransferRoute
,
FileTransferRoute
检测并使用此消息:已创建交换,将在步骤 #1 创建的消息作为输入消息,
pollEnrich
FileTransferRoute
中的组件会阻塞,直到在输入目录中检测到新文件(因为未定义超时)
=> 这也将阻止 Spring 调度程序,因为您正在使用同步方法 producerTemplate.sendBody()
和 direct:
通道(direct
组件使用同步调用)
=> 这回答了你的第一个问题:Spring 调度程序实际上仍然是 运行,但它的线程在 sendBody()
方法调用中被阻塞,因为 pollEnrich
组件轮询新传入文件。输入目录中的下一个传入文件将恢复轮询;然后计划的方法应该完成,并且应该在 60 秒后触发新的调用。处理完第一个文件后,调度程序没有理由停止。
当您停止 Springboot 应用程序时,pollEnrich
组件仍然被阻止,轮询新的传入文件:这意味着仍有一个挂起的 Exchange(创建于上面的步骤 2) 尚未完成
=> 这回答了您的第二个问题:"inflight exchange" 实际上是由最新的调度程序调用创建的最新 Exchange。骆驼默认 shutdown strategy 将在关闭前等待 X 秒,以防挂起(飞行中)交换。
我建议你为 Camel 记录器启用 DEBUG 级别,你会更好地理解幕后发生的事情。
您可以通过在 pollEnrich 组件中使用超时来摆脱飞行交换问题并改进您的路线。由于您在专用 Spring 调度程序中实现调度程序逻辑,我认为您应该使用 receiveNoWait
模式(0s 超时)。
from("direct:transferFile").autoStartup(true)
.pollEnrich("file:C:\var\CamelDemo\inputFolder?noop=true&delay=2000", 0)
.choice()
.when(exchange -> exchange.getIn().getBody() != null)
.to("file:C:\var\CamelDemo\outputFolder?autoCreate=false")
// handle case where no file is found
.otherwise()
.log(" skip, no file found. ")
.end();
希望对您有所帮助。
我正在尝试使用 Spring Boot 和 Apache Camel 文件组件的演示文件传输程序。我已经使用 Spring Boot 编写了一个调度程序,它将每 1 分钟 运行 调用 Apache Camel 路由并进行文件传输。我在目录 C:\CamelDemo\inputFolder 中有三个文件,即 input1.txt、input2.txt 和 input3.txt。我的 Spring 启动调度程序如下:
package com.example.demo.scheduler;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Scheduler {
@Autowired private ProducerTemplate producerTemplate;
@Scheduled(fixedRate=60000)
public void fixedRateSch() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date now = new Date();
String strDate = sdf.format(now);
System.out.println("Fixed Rate scheduler:: " + strDate);
producerTemplate.sendBody("direct:transferFile", null);
System.out.println("Success");
}
}
我的路线如下:
package com.example.demo.route;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class FileTransferRoute extends RouteBuilder {
@SuppressWarnings("deprecation")
@Override
public void configure() {
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliverDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.WARN));
from("direct:transferFile")
.log("Route reached")
.pollEnrich("file:C:\CamelDemo\inputFolder?noop=true")
.to("file:C:\CamelDemo\outputFolder?autoCreate=false")
.end();
}
}
当我运行程序时,调度程序运行ning 3次将三个文件一个一个地传输。在那之后,调度器就不再是 运行ning 了。然后,当我试图通过关闭嵌入式 Tomcal 来停止 Spring 引导应用程序时,它会出现以下错误:
2019-09-11 15:51:14.711 INFO 10408 --- [n(15)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-09-11 15:51:14.714 INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.spring.SpringCamelContext : Apache Camel 2.24.0 (CamelContext: camel-1) is shutting down
2019-09-11 15:51:14.714 INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.impl.DefaultShutdownStrategy : Starting to graceful shutdown 1 routes (timeout 300 seconds)
2019-09-11 15:51:14.717 INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy : Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 1]
2019-09-11 15:51:14.718 INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy : There are 1 inflight exchanges:
InflightExchange: [exchangeId=ID-PCIN467166-1568196927146-0-10, fromRouteId=route1, routeId=route1, nodeId=pollEnrich1, elapsed=0, duration=167106]
所以有以下问题: 1. 我怎样才能使调度程序 运行 永远,以便它会继续轮询文件位置,当文件夹中有新文件时,它会将文件传输到输出目录。 2. 如何正确关闭我想要的 Spring 引导应用程序以及为什么在关闭期间抛出错误? 3. 如何在第一个 运行 本身而不是一个接一个地同时传输所有三个文件?
对于问题 #3:我认为这是不可能的,请参阅 your other related question
中的评论对于问题 #1 和 #2:了解此行为的关键点是您在 receive[= 中使用 pollEnrich
57=] 模式,因为你没有设置任何超时:请参阅 PollEnrich documentation 中关于超时的详细解释,特别是:
Good practice to use timeout value === By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit. ===
回答您的问题:这是您当前路线设置的情况:
- 每隔 60 秒,Spring 调度程序通过向 "direct:transferFile" 通道发送一条带有 "null" 正文的虚拟消息来触发
FileTransferRoute
, FileTransferRoute
检测并使用此消息:已创建交换,将在步骤 #1 创建的消息作为输入消息,pollEnrich
FileTransferRoute
中的组件会阻塞,直到在输入目录中检测到新文件(因为未定义超时)=> 这也将阻止 Spring 调度程序,因为您正在使用同步方法
producerTemplate.sendBody()
和direct:
通道(direct
组件使用同步调用)=> 这回答了你的第一个问题:Spring 调度程序实际上仍然是 运行,但它的线程在
sendBody()
方法调用中被阻塞,因为pollEnrich
组件轮询新传入文件。输入目录中的下一个传入文件将恢复轮询;然后计划的方法应该完成,并且应该在 60 秒后触发新的调用。处理完第一个文件后,调度程序没有理由停止。当您停止 Springboot 应用程序时,
pollEnrich
组件仍然被阻止,轮询新的传入文件:这意味着仍有一个挂起的 Exchange(创建于上面的步骤 2) 尚未完成=> 这回答了您的第二个问题:"inflight exchange" 实际上是由最新的调度程序调用创建的最新 Exchange。骆驼默认 shutdown strategy 将在关闭前等待 X 秒,以防挂起(飞行中)交换。
我建议你为 Camel 记录器启用 DEBUG 级别,你会更好地理解幕后发生的事情。
您可以通过在 pollEnrich 组件中使用超时来摆脱飞行交换问题并改进您的路线。由于您在专用 Spring 调度程序中实现调度程序逻辑,我认为您应该使用 receiveNoWait
模式(0s 超时)。
from("direct:transferFile").autoStartup(true)
.pollEnrich("file:C:\var\CamelDemo\inputFolder?noop=true&delay=2000", 0)
.choice()
.when(exchange -> exchange.getIn().getBody() != null)
.to("file:C:\var\CamelDemo\outputFolder?autoCreate=false")
// handle case where no file is found
.otherwise()
.log(" skip, no file found. ")
.end();
希望对您有所帮助。