阿帕奇点燃。如何在不在每个节点上执行的情况下创建计时器任务(基于 cron)?
Apache Ignite. How to create timer task (cron-based) without it execution on per node?
如何在不在每个节点上执行的情况下创建计时器任务(基于 cron),即使用 Apache Ingite 在时间表中每个时间点执行一次?
我的集群由 2 个节点和带有计时器任务的应用程序 (war) 组成。在非集群模式下,应用程序运行良好。但它有内部计时器任务(即每 5 分钟启动一次),可与共享资源一起使用。
我试着去做。但是 IngiteScheduler#scheduleLocal 如果两个应用程序实例都已启动(每个应用程序实例都尝试启动相同的计时器任务),则会在每个节点上部署和运行任务。
我认为 ignite 具有 部署任务的机制,id 为...
谢谢。
2018 年 12 月 18 日更新:
(感谢@alamar 的想法)
下面我展示了成功解决方案的源代码和测试:
IgniteTimerTest.java:
package com.Whosebug.question53780890.test;
import com.Whosebug.question53780890.JobRunner;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.resources.SpringResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.io.ClassPathResource;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class IgniteTimerTest {
private static final ExecutorService ES = Executors.newFixedThreadPool(5);
@Test
public void test() throws Exception {
Future<ConfigurableApplicationContext> applicationContextFutureOne = ES.submit(() -> create(ConfigOne.class));
Future<ConfigurableApplicationContext> applicationContextFutureTwo = ES.submit(() -> create(ConfigTwo.class));
try (ConfigurableApplicationContext applicationContextOne = applicationContextFutureOne.get();
ConfigurableApplicationContext applicationContextTwo = applicationContextFutureTwo.get();) {
Ignite igniteOne = applicationContextOne.getBean(Ignite.class);
Ignite igniteTwo = applicationContextTwo.getBean(Ignite.class);
IgniteServices servicesOne = igniteOne.services();
IgniteServices servicesTwo = igniteTwo.services();
servicesOne.deployClusterSingleton(TestTimerRunner.class.getName(), new TestTimerRunner());
Thread.sleep(JobRunner.PERIOD * 3);
servicesTwo.deployClusterSingleton(TestTimerRunner.class.getName(), new TestTimerRunner());
Thread.sleep(JobRunner.PERIOD * 3);
applicationContextOne.close();
Thread.sleep(JobRunner.PERIOD * 3);
int countValue = JobRunner.getConterValue();
Assertions.assertTrue(9 <= countValue && countValue <= 11);
}
}
private ConfigurableApplicationContext create(Class mainClass) {
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(mainClass);
return context;
}
class TestTimerRunner implements Serializable, Service {
@SpringResource(resourceClass = JobRunner.class)
private transient JobRunner jobRunner;
public TestTimerRunner() {
}
@Override
public void cancel(ServiceContext ctx) {
jobRunner.stop();
}
@Override
public void init(ServiceContext ctx) throws Exception {
}
@Override
public void execute(ServiceContext ctx) throws Exception {
jobRunner.start();
}
}
@Configuration
@ImportResource("classpath:common.xml")
public static class ConfigOne {
@Bean
public static PropertySourcesPlaceholderConfigurer placeHolderConfigurer() {
PropertySourcesPlaceholderConfigurer holder = new PropertySourcesPlaceholderConfigurer();
holder.setLocation(new ClassPathResource("one.properties"));
return holder;
}
}
@Configuration
@ImportResource("classpath:common.xml")
public static class ConfigTwo {
@Bean
public static PropertySourcesPlaceholderConfigurer placeHolderConfigurer() {
PropertySourcesPlaceholderConfigurer holder = new PropertySourcesPlaceholderConfigurer();
holder.setLocation(new ClassPathResource("two.properties"));
return holder;
}
}
}
JobRunner.java:
package com.Whosebug.question53780890;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class JobRunner implements Runnable {
public static final long PERIOD = 5_000;
private static AtomicInteger counter = new AtomicInteger();
@Autowired
private TaskScheduler taskScheduler;
private ScheduledFuture<?> job;
public static int getConterValue() {
return counter.get();
}
public void start() {
job = taskScheduler.scheduleAtFixedRate(this, PERIOD);
}
public void stop() {
job.cancel(true);
}
@Override
public void run() {
counter.incrementAndGet();
}
}
common.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<task:annotation-driven executor="myExecutor" scheduler="myScheduler"/>
<task:executor id="myExecutor" pool-size="5"/>
<task:scheduler id="myScheduler" pool-size="5"/>
<bean id="jobRunner" class="com.Whosebug.question53780890.JobRunner"/>
<bean id="ignite" class="org.apache.ignite.IgniteSpringBean">
<property name="configuration" ref="igniteConfiguration"/>
</bean>
<bean id="igniteConfiguration" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="${igniteInstanceName}"/>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"
p:addresses-ref="igniteNodesAddresses"/>
</property>
<property name="joinTimeout" value="20000"/>
<property name="localPort" value="${igniteLocalPort}"/>
<property name="localPortRange" value="1"/>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="${communicationSpiPort}"/>
</bean>
</property>
</bean>
<bean id="igniteNodesAddresses" class="org.springframework.util.StringUtils"
factory-method="commaDelimitedListToStringArray">
<constructor-arg type="java.lang.String" value="${igniteNodes}"/>
</bean>
</beans>
one.properties:
igniteLocalPort=47501
communicationSpiPort=47101
igniteNodes=localhost:47501,localhost:47502
igniteInstanceName=one
two.properties:
igniteLocalPort=47502
communicationSpiPort=47102
igniteNodes=localhost:47501,localhost:47502
igniteInstanceName=two
我认为您可以部署一个 singleton service,它将在其 execute()
中执行 scheduleLocal 并在其 cancel()
方法中取消调度。这样它将故障转移到下一个节点并在当前节点离开时正确地重新安排您的任务。
如何在不在每个节点上执行的情况下创建计时器任务(基于 cron),即使用 Apache Ingite 在时间表中每个时间点执行一次?
我的集群由 2 个节点和带有计时器任务的应用程序 (war) 组成。在非集群模式下,应用程序运行良好。但它有内部计时器任务(即每 5 分钟启动一次),可与共享资源一起使用。
我试着去做。但是 IngiteScheduler#scheduleLocal 如果两个应用程序实例都已启动(每个应用程序实例都尝试启动相同的计时器任务),则会在每个节点上部署和运行任务。
我认为 ignite 具有 部署任务的机制,id 为...
谢谢。
2018 年 12 月 18 日更新:
(感谢@alamar 的想法)
下面我展示了成功解决方案的源代码和测试:
IgniteTimerTest.java:
package com.Whosebug.question53780890.test;
import com.Whosebug.question53780890.JobRunner;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.resources.SpringResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.io.ClassPathResource;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class IgniteTimerTest {
private static final ExecutorService ES = Executors.newFixedThreadPool(5);
@Test
public void test() throws Exception {
Future<ConfigurableApplicationContext> applicationContextFutureOne = ES.submit(() -> create(ConfigOne.class));
Future<ConfigurableApplicationContext> applicationContextFutureTwo = ES.submit(() -> create(ConfigTwo.class));
try (ConfigurableApplicationContext applicationContextOne = applicationContextFutureOne.get();
ConfigurableApplicationContext applicationContextTwo = applicationContextFutureTwo.get();) {
Ignite igniteOne = applicationContextOne.getBean(Ignite.class);
Ignite igniteTwo = applicationContextTwo.getBean(Ignite.class);
IgniteServices servicesOne = igniteOne.services();
IgniteServices servicesTwo = igniteTwo.services();
servicesOne.deployClusterSingleton(TestTimerRunner.class.getName(), new TestTimerRunner());
Thread.sleep(JobRunner.PERIOD * 3);
servicesTwo.deployClusterSingleton(TestTimerRunner.class.getName(), new TestTimerRunner());
Thread.sleep(JobRunner.PERIOD * 3);
applicationContextOne.close();
Thread.sleep(JobRunner.PERIOD * 3);
int countValue = JobRunner.getConterValue();
Assertions.assertTrue(9 <= countValue && countValue <= 11);
}
}
private ConfigurableApplicationContext create(Class mainClass) {
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(mainClass);
return context;
}
class TestTimerRunner implements Serializable, Service {
@SpringResource(resourceClass = JobRunner.class)
private transient JobRunner jobRunner;
public TestTimerRunner() {
}
@Override
public void cancel(ServiceContext ctx) {
jobRunner.stop();
}
@Override
public void init(ServiceContext ctx) throws Exception {
}
@Override
public void execute(ServiceContext ctx) throws Exception {
jobRunner.start();
}
}
@Configuration
@ImportResource("classpath:common.xml")
public static class ConfigOne {
@Bean
public static PropertySourcesPlaceholderConfigurer placeHolderConfigurer() {
PropertySourcesPlaceholderConfigurer holder = new PropertySourcesPlaceholderConfigurer();
holder.setLocation(new ClassPathResource("one.properties"));
return holder;
}
}
@Configuration
@ImportResource("classpath:common.xml")
public static class ConfigTwo {
@Bean
public static PropertySourcesPlaceholderConfigurer placeHolderConfigurer() {
PropertySourcesPlaceholderConfigurer holder = new PropertySourcesPlaceholderConfigurer();
holder.setLocation(new ClassPathResource("two.properties"));
return holder;
}
}
}
JobRunner.java:
package com.Whosebug.question53780890;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class JobRunner implements Runnable {
public static final long PERIOD = 5_000;
private static AtomicInteger counter = new AtomicInteger();
@Autowired
private TaskScheduler taskScheduler;
private ScheduledFuture<?> job;
public static int getConterValue() {
return counter.get();
}
public void start() {
job = taskScheduler.scheduleAtFixedRate(this, PERIOD);
}
public void stop() {
job.cancel(true);
}
@Override
public void run() {
counter.incrementAndGet();
}
}
common.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<task:annotation-driven executor="myExecutor" scheduler="myScheduler"/>
<task:executor id="myExecutor" pool-size="5"/>
<task:scheduler id="myScheduler" pool-size="5"/>
<bean id="jobRunner" class="com.Whosebug.question53780890.JobRunner"/>
<bean id="ignite" class="org.apache.ignite.IgniteSpringBean">
<property name="configuration" ref="igniteConfiguration"/>
</bean>
<bean id="igniteConfiguration" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="${igniteInstanceName}"/>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"
p:addresses-ref="igniteNodesAddresses"/>
</property>
<property name="joinTimeout" value="20000"/>
<property name="localPort" value="${igniteLocalPort}"/>
<property name="localPortRange" value="1"/>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="${communicationSpiPort}"/>
</bean>
</property>
</bean>
<bean id="igniteNodesAddresses" class="org.springframework.util.StringUtils"
factory-method="commaDelimitedListToStringArray">
<constructor-arg type="java.lang.String" value="${igniteNodes}"/>
</bean>
</beans>
one.properties:
igniteLocalPort=47501
communicationSpiPort=47101
igniteNodes=localhost:47501,localhost:47502
igniteInstanceName=one
two.properties:
igniteLocalPort=47502
communicationSpiPort=47102
igniteNodes=localhost:47501,localhost:47502
igniteInstanceName=two
我认为您可以部署一个 singleton service,它将在其 execute()
中执行 scheduleLocal 并在其 cancel()
方法中取消调度。这样它将故障转移到下一个节点并在当前节点离开时正确地重新安排您的任务。