如何让 Play 应用程序中的 Scala Kafka 消费者在应用程序的整个生命周期中持续收听代理?

How to get a Scala Kafka Consumer in a Play application to continously listen to the broker through the life of the application?

我正在尝试创建一个 Play-Scala 应用程序,它使用 Scala Kafka Consumer 来收听 Kafka 代理。我正在使用 Cake Solutions Scala Kafka Client library, and following their example here.

我创建了一个包含 class 的容器作为 Kafka 消费者提供者,我将其绑定为一个急切的单例,以便在应用程序启动时创建它。

问题是消费者会在应用程序启动时监听代理,但之后就不会了。

这是我的 ConsumerProvider 代码:

trait KafkaConsumerProvider {

  def consumer: ActorRef

}

@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
    extends KafkaConsumerProvider {

  private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
    keyDeserializer = new StringDeserializer,
    valueDeserializer = new StringDeserializer,
    bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
    groupId = configuration.get[String]("messageBroker.consumer.groupId"),
    enableAutoCommit = false,
    autoCommitInterval= 1000,
    sessionTimeoutMs = 10000,
    maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
    maxPollRecords = 500,
    maxPollInterval = 300000,
    maxMetaDataAge  = 300000,
    autoOffsetReset = OffsetResetStrategy.LATEST,
    isolationLevel = IsolationLevel.READ_UNCOMMITTED,
  )

  private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
    scheduleInterval = 1.seconds,   // scheduling interval for Kafka polling when consumer is inactive
    unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
    maxRedeliveries = 3             // maximum number of times a unconfirmed message will be redelivered
  )

  override val consumer: ActorRef = {
    val receiverActor = actorSystem.actorOf(ReceiverActor.props)
    val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
    val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
    _consumer ! Subscribe.AutoPartition(topics)
    _consumer
  }

}

下面是我如何将依赖项绑定为 Module.scala 中的热切单例:

class Module extends AbstractModule with ScalaModule {

  override def configure(): Unit = {
    bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
    bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
  }

}

如何让消费者继续倾听?

问题是,在 ReceiverActor 中,我忘记确认偏移量:

sender() ! Confirm(records.offsets)