使用 Akka Streams Kafka 处理流终止
Handling Streams Termination with Akka Streams Kafka
我正在使用 Akka Streams Kafka 库与 Kafka 代理交互。我有以下带有 UnBounded 缓冲区的流定义:
def producerStream[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow).to(streamSink).run()
}
我从 Actor 实例中调用 producerStream。我有一条消息类型:
case class InitiateStream[T](publisher: ActorRef, anyMessage: T)
在我的 Actor 中,我有以下接收块:
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
producerStream[DefaultMessage](cfg.producerProps)
// context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
publisherActor 最终将获得应该推送到相应 Kafka 主题的消息。
现在我的问题是,我是否需要为关闭 producerStream 而烦恼?
我正在考虑在启动流后立即执行 context.become(active(producerStream)),在 active 方法中,我将根据 DestroyStream 消息处理流终止。这个有必要吗?大家怎么看?
如果您使用 Akka Kafka 使用者,只要主题中的事件可用,源就会无限期地发出事件。为什么需要关闭生产者?
我建议调用以下内容来启动您的来源:
def producerSource[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow)
}
def startStream[A](source: Source[A, NotUsed])(
implicit am: ActorMaterializer): (UniqueKillSwitch, Future[Done]) = {
source
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run
}
var streamHandler : Option[UniqueKillSwitch] = None
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
streamHandler = Some(producerStream[DefaultMessage](cfg.producerProps))
context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
以及主动行为:
val active: PartialFunction[Any, Unit] =
case DestroyStream =>
if(streamHandler.isDefined)
streamHandler.shutdown
我正在使用 Akka Streams Kafka 库与 Kafka 代理交互。我有以下带有 UnBounded 缓冲区的流定义:
def producerStream[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow).to(streamSink).run()
}
我从 Actor 实例中调用 producerStream。我有一条消息类型:
case class InitiateStream[T](publisher: ActorRef, anyMessage: T)
在我的 Actor 中,我有以下接收块:
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
producerStream[DefaultMessage](cfg.producerProps)
// context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
publisherActor 最终将获得应该推送到相应 Kafka 主题的消息。
现在我的问题是,我是否需要为关闭 producerStream 而烦恼?
我正在考虑在启动流后立即执行 context.become(active(producerStream)),在 active 方法中,我将根据 DestroyStream 消息处理流终止。这个有必要吗?大家怎么看?
如果您使用 Akka Kafka 使用者,只要主题中的事件可用,源就会无限期地发出事件。为什么需要关闭生产者?
我建议调用以下内容来启动您的来源:
def producerSource[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow)
}
def startStream[A](source: Source[A, NotUsed])(
implicit am: ActorMaterializer): (UniqueKillSwitch, Future[Done]) = {
source
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run
}
var streamHandler : Option[UniqueKillSwitch] = None
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
streamHandler = Some(producerStream[DefaultMessage](cfg.producerProps))
context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
以及主动行为:
val active: PartialFunction[Any, Unit] =
case DestroyStream =>
if(streamHandler.isDefined)
streamHandler.shutdown