如何让 Play 应用程序中的 Scala Kafka 消费者在应用程序的整个生命周期中持续收听代理?
How to get a Scala Kafka Consumer in a Play application to continously listen to the broker through the life of the application?
我正在尝试创建一个 Play-Scala 应用程序,它使用 Scala Kafka Consumer 来收听 Kafka 代理。我正在使用 Cake Solutions Scala Kafka Client library, and following their example here.
我创建了一个包含 class 的容器作为 Kafka 消费者提供者,我将其绑定为一个急切的单例,以便在应用程序启动时创建它。
问题是消费者会在应用程序启动时监听代理,但之后就不会了。
这是我的 ConsumerProvider 代码:
trait KafkaConsumerProvider {
def consumer: ActorRef
}
@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
extends KafkaConsumerProvider {
private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
groupId = configuration.get[String]("messageBroker.consumer.groupId"),
enableAutoCommit = false,
autoCommitInterval= 1000,
sessionTimeoutMs = 10000,
maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
maxPollRecords = 500,
maxPollInterval = 300000,
maxMetaDataAge = 300000,
autoOffsetReset = OffsetResetStrategy.LATEST,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
)
private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
scheduleInterval = 1.seconds, // scheduling interval for Kafka polling when consumer is inactive
unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
maxRedeliveries = 3 // maximum number of times a unconfirmed message will be redelivered
)
override val consumer: ActorRef = {
val receiverActor = actorSystem.actorOf(ReceiverActor.props)
val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
_consumer ! Subscribe.AutoPartition(topics)
_consumer
}
}
下面是我如何将依赖项绑定为 Module.scala
中的热切单例:
class Module extends AbstractModule with ScalaModule {
override def configure(): Unit = {
bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
}
}
如何让消费者继续倾听?
问题是,在 ReceiverActor
中,我忘记确认偏移量:
sender() ! Confirm(records.offsets)
我正在尝试创建一个 Play-Scala 应用程序,它使用 Scala Kafka Consumer 来收听 Kafka 代理。我正在使用 Cake Solutions Scala Kafka Client library, and following their example here.
我创建了一个包含 class 的容器作为 Kafka 消费者提供者,我将其绑定为一个急切的单例,以便在应用程序启动时创建它。
问题是消费者会在应用程序启动时监听代理,但之后就不会了。
这是我的 ConsumerProvider 代码:
trait KafkaConsumerProvider {
def consumer: ActorRef
}
@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
extends KafkaConsumerProvider {
private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
groupId = configuration.get[String]("messageBroker.consumer.groupId"),
enableAutoCommit = false,
autoCommitInterval= 1000,
sessionTimeoutMs = 10000,
maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
maxPollRecords = 500,
maxPollInterval = 300000,
maxMetaDataAge = 300000,
autoOffsetReset = OffsetResetStrategy.LATEST,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
)
private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
scheduleInterval = 1.seconds, // scheduling interval for Kafka polling when consumer is inactive
unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
maxRedeliveries = 3 // maximum number of times a unconfirmed message will be redelivered
)
override val consumer: ActorRef = {
val receiverActor = actorSystem.actorOf(ReceiverActor.props)
val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
_consumer ! Subscribe.AutoPartition(topics)
_consumer
}
}
下面是我如何将依赖项绑定为 Module.scala
中的热切单例:
class Module extends AbstractModule with ScalaModule {
override def configure(): Unit = {
bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
}
}
如何让消费者继续倾听?
问题是,在 ReceiverActor
中,我忘记确认偏移量:
sender() ! Confirm(records.offsets)