如何启动使用分区的 Spring 批处理应用程序的从站?

How to start the slaves of a Spring Batch application that uses partitioning?

我们正在使用 Spring 批处理分区在两个 JVM 上并行处理多个输入文件。一个JVM中有一主一从运行,另一个JVM上有另一个从运行。

在第一个 JVM 上启动主服务器和从服务器是通过启动 Spring 启动应用程序并传递作业名称来完成的,就像启动任何其他批处理作业一样。

我们通过启动传递虚拟作业名称的 Spring 启动应用程序在第二个 JVM 上启动从站。 slave没有job config,只有接收消息的inbound flow,stepExecutionRequestHandler,step code

结果:所有slave bean初始化成功,slave consumer收到消息,启动stepExecutionRequestHandler,创建DB连接失败,没有任何错误。如果我将作业配置添加到从机并通过正确的作业名称启动作业,问题就不会发生,这让我认为问题可能与没有启动真正的 Spring 批处理作业有关,这应该是初始化一些需要的资源。我确实验证了 datasourceConfiguration 和 datasource bean 是否已初始化,这是作为单独模块的一部分完成的。

所以我想知道我是否以正确的方式启动奴隶,或者是否有更好的方式来启动他们。

这是从站的配置:


  /*
   * Configure inbound flow (requests coming from the master)
   */

  @Bean
  public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
  }

  @Bean
  public StepLocator stepLocator() {
    BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
    beanFactoryStepLocator.setBeanFactory(beanFactory);

    return beanFactoryStepLocator;
  }

 @Bean
  @ServiceActivator(inputChannel = "inboundRequests")
  public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
  }

  @Bean
  public MessageChannel inboundRequests() {
    return new DirectChannel();
  }

  @Bean
  public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
                                           @Qualifier("inboundRequests") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
  }

  @Bean
  public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames(this.requestQueue);
    container.setPrefetchCount(1);

    return container;
  }

combineReleaseJobNormalStep CODE ... 

这里是master的配置:


  @Bean
  public Job combineReleaseJob() throws Exception {
    return jobBuilderFactory.get("CombineReleaseJob")
      .incrementer(new RunIdIncrementer())
      .listener(resourceLoader)
      .listener(combineReleaseJobJobContextPreparer())
      .flow(combineReleaseJobCL31401())
      .from(combineReleaseJobCL31401()).on("N").to(combineReleaseJobNormalStepManager())
      .from(combineReleaseJobCL31401()).on("R").end()
      .from(combineReleaseJobNormalStepManager()).on("COMPLETED").to(combineReleaseJobAddressTableCheck())
      .from(combineReleaseJobNormalStepManager()).on("FAILED").fail()
      .end().build();
  }

  @Bean
  public Step combineReleaseJobNormalStepManager() throws Exception {
    return stepBuilderFactory.get("combineReleaseJobNormalStep.Manager")
      .partitioner("combineReleaseJobNormalStep",partitioner())
      .partitionHandler(partitionHandler())
      .build();
  }


  @Bean
  public PartitionHandler partitionHandler() throws Exception {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();

    partitionHandler.setStepName("combineReleaseJobNormalStep");
    partitionHandler.setGridSize(GRID_SIZE);
    partitionHandler.setMessagingOperations(messageTemplate());
    //partitionHandler.setPollInterval(5000l);
    partitionHandler.setJobExplorer(this.jobExplorer);

    partitionHandler.afterPropertiesSet();

    return partitionHandler;
  }

  @Bean
  public MessagingTemplate messageTemplate() {
    MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());

    messagingTemplate.setReceiveTimeout(60000000l);

    return messagingTemplate;
  }

  /*
   * Configure outbound flow (requests going to slaves)
   */
  @Bean
  public MessageChannel outboundRequests() {
    return new DirectChannel();
  }

  @Bean
  public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
    return IntegrationFlows
      .from(outboundRequests())
      .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(this.requestQueue))
      .get();
  }

If I add the job config to the slave and start the job passing the right job name, the problem does not happen, which makes me think that the problem may be related to not starting a real Spring Batch job,

您不需要 运行 整个 Spring 工人端的批处理作业。工作通常在主端开始,工作端只需要工作步骤。请参阅参考文档的 Remote partitioning 部分。

How to start the slaves of a Spring Batch application that uses partitioning?

Workers 可以作为常规 Spring(引导)应用程序启动,其中 StepExecutionRequestHandler(通常配置为 Spring 集成服务激活器)侦听传入的 StepExecutionRequest s 并执行工作步骤(位于 StepLocator)。

你可以在演讲中找到完整的例子集High Performance Batch Processing which I co-presented with Michael at SpringOne 2018. The source code of the examples can be found here: https://github.com/mminella/scaling-demos/tree/sp1-2018

我们发现我们正在将从属作为批处理作业启动,因为它最初是作为主添加到同一个批处理应用程序中的。作业名称是批处理应用程序中的必需参数,它对 Spring 批处理工具和 jar 有很多依赖性,因此它试图以某种方式启动批处理作业,并且由于作业配置不存在,它会在处理过程中失败并关闭所有 beans 和资源,导致上述数据库连接问题。当我们 运行 作为守护程序或常规 spring 引导应用程序的从站时,它会正常启动并执行步骤直至完成。

为了避免大量返工并从应用程序中删除所有批处理依赖项以使其 运行 作为常规 Spring 启动应用程序,我们使用此代码使其 运行 作为守护进程:

@SpringBootApplication
@IntegrationComponentScan
public class Application implements CommandLineRunner {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
  }

  @Override
  public void run(String... args) throws Exception {
    System.out.println("Joining thread, you can press Ctrl+C to shutdown application");
    Thread.currentThread().join();
  }

}

``