如何在流中使用确认语义?

How can I use acknowledgment semantics in a Flow?

我想设计一个流程,它只会在 ActorRef 收到一些确认消息时从上游拉取。

我的用例是使用一些静态集群演员来做工作。我有一个监护人,负责维护是否有可用的工人。当一个 actor 可用时,它会发布一条消息。我希望我的 Flow 知道此消息以提取作业并将其推送到下游。否则会反压。

阅读文档时,GraphStage API 似乎没有一些反应组件,依赖于 InHandlers 进行自定义处理:

setHandler(in, new InHandler {
  override def onPush(): Unit = {
    println(grab(in))
    pull(in)
  }
})

我可能会在这个方法中轮询以更新可用参与者的数量,但这不是被动的。

Akka默认支持Flows中的确认吗?

推不拉

一旦 Actor 空闲,您可以让 Source 发送数据,而不是让流信号请求上游。

您可以创建一个 Source,它将具体化为一个 ActorRef,它将收到一个免费 Actor 的通知:

object ActorIsFreeMessage

val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)

然后您可以附加一个 Flow,一旦源收到消息,它将 "pull a job":

type Job = ???

val pullAJob : () => Job = ???

val jobFlow : Flow[ActorIsFreeMessage, Job] = 
  Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())

val jobSource : Source[Job, ActorRef] = source via jobFlow

一旦这个新的 Source 连接到 Sink,其他 Actor 就可以向物化的 ActorRef:

发送消息
val jobSink : Sink[Job, ActorRef] = ???

val streamRef = jobSource.to(jobSink).run()

//inside of the Actor
streamRef ! ActorIsFreeMessage