Spring 集成 JMS 消费者未使用所有消息

Spring Integration JMS Consumers not consuming all messages

设置

我有 Spring 名为 Dispatcher 的启动应用程序。它在 1 台机器上运行并具有嵌入式 ActiveMQ Broker:

  @Bean
  public BrokerService broker(ActiveMQProperties properties) throws Exception {
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector(properties.getBrokerUrl());
    return broker;
  }

将任务写入 JMS 队列:

  @Bean
  public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
      .from(taskQueue())
      .bridge(Bridges.blockingPoller(outboundTaskScheduler()))
      .transform(outboundTransformer)
      .handle(Jms.outboundAdapter(connectionFactory)
        .extractPayload(false)
        .destination(JmsQueueNames.STANDARD_TASKS))
      .get();
  }

  @Bean
  public QueueChannel standardTaskQueue() {
    return MessageChannels.priority()
      .comparator(TASK_PRIO_COMPARATOR)
      .get();
  }

  // 2 more queues with different names but same config

Worker 应用程序在 10 台机器上运行,每台机器有 20 个内核,配置如下:

  @Bean
  public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
    int maxWorkers = 20;
    return IntegrationFlows
      .from(Jms.channel(connectionFactory)
        .sessionTransacted(true)
        .concurrentConsumers(maxWorkers)
        .taskExecutor(
          Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
        )
        .destination(JmsQueueNames.STANDARD_TASKS))
      .channel(ChannelNames.TASKS_INBOUND)
      .get();
  }

  // 2 more inbound queues with different names but same config

第二个队列重复此操作,外加 1 个特殊情况。所以 共有 401 个消费者

观察

使用JConsole,我可以看到ActiveMQ队列中有任务:

[TODO 插入截图]

不出所料,在任何 Worker 机器上,都有 20 个消费者线程:

[TODO 插入截图]

但大多数(如果不是全部)空闲,即使队列中仍有消息。使用我们的监控工具,我看到在任何给定时间都在处理大约 50 到 400 个任务,而期望值是恒定的 400。

我还观察到 Spring 为每个消费者创建 AbstractPollingMessageListenerContainer,这似乎导致每个应用程序每个队列每秒打开 1 个 JMS 连接(每秒 33 个连接)。

调查

所以我发现 I do not receive messages in my second consumer 暗示 prefetch 是罪魁祸首。这听起来很有道理,所以我在每个 worker 上配置了 tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1。然而,在任何时候都只处理了大约 25 个任务,这对我来说完全没有意义。

问题

我似乎不明白发生了什么,而且由于我 运行 没有时间进行调查,我希望任何人都可以为我指出正确的方向。哪些因素可能是原因? consumers/connections的数量?预取?还有什么吗?

原来是预取策略导致的。在我的例子中正确的配置是使用 tcp://dispatcher:61616?jms.prefetchPolicy.all=0

在我之前的(失败的)测试中我使用了 jms.prefetchPolicy.queuePrefetch=1 但事后我不确定我是否在正确的地方配置它。