如何在流中使用确认语义?
How can I use acknowledgment semantics in a Flow?
我想设计一个流程,它只会在 ActorRef
收到一些确认消息时从上游拉取。
我的用例是使用一些静态集群演员来做工作。我有一个监护人,负责维护是否有可用的工人。当一个 actor 可用时,它会发布一条消息。我希望我的 Flow 知道此消息以提取作业并将其推送到下游。否则会反压。
阅读文档时,GraphStage
API 似乎没有一些反应组件,依赖于 InHandler
s 进行自定义处理:
setHandler(in, new InHandler {
override def onPush(): Unit = {
println(grab(in))
pull(in)
}
})
我可能会在这个方法中轮询以更新可用参与者的数量,但这不是被动的。
Akka默认支持Flows中的确认吗?
推不拉
一旦 Actor
空闲,您可以让 Source
发送数据,而不是让流信号请求上游。
您可以创建一个 Source,它将具体化为一个 ActorRef
,它将收到一个免费 Actor 的通知:
object ActorIsFreeMessage
val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)
然后您可以附加一个 Flow
,一旦源收到消息,它将 "pull a job":
type Job = ???
val pullAJob : () => Job = ???
val jobFlow : Flow[ActorIsFreeMessage, Job] =
Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())
val jobSource : Source[Job, ActorRef] = source via jobFlow
一旦这个新的 Source 连接到 Sink,其他 Actor 就可以向物化的 ActorRef
:
发送消息
val jobSink : Sink[Job, ActorRef] = ???
val streamRef = jobSource.to(jobSink).run()
//inside of the Actor
streamRef ! ActorIsFreeMessage
我想设计一个流程,它只会在 ActorRef
收到一些确认消息时从上游拉取。
我的用例是使用一些静态集群演员来做工作。我有一个监护人,负责维护是否有可用的工人。当一个 actor 可用时,它会发布一条消息。我希望我的 Flow 知道此消息以提取作业并将其推送到下游。否则会反压。
阅读文档时,GraphStage
API 似乎没有一些反应组件,依赖于 InHandler
s 进行自定义处理:
setHandler(in, new InHandler {
override def onPush(): Unit = {
println(grab(in))
pull(in)
}
})
我可能会在这个方法中轮询以更新可用参与者的数量,但这不是被动的。
Akka默认支持Flows中的确认吗?
推不拉
一旦 Actor
空闲,您可以让 Source
发送数据,而不是让流信号请求上游。
您可以创建一个 Source,它将具体化为一个 ActorRef
,它将收到一个免费 Actor 的通知:
object ActorIsFreeMessage
val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)
然后您可以附加一个 Flow
,一旦源收到消息,它将 "pull a job":
type Job = ???
val pullAJob : () => Job = ???
val jobFlow : Flow[ActorIsFreeMessage, Job] =
Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())
val jobSource : Source[Job, ActorRef] = source via jobFlow
一旦这个新的 Source 连接到 Sink,其他 Actor 就可以向物化的 ActorRef
:
val jobSink : Sink[Job, ActorRef] = ???
val streamRef = jobSource.to(jobSink).run()
//inside of the Actor
streamRef ! ActorIsFreeMessage