Akka 流:将新的 publishers/subscribers 附加到 Flow
Akka streams: attach new publishers/subscribers to Flow
我正在构建一个 Akka 应用程序,并希望将某些参与者的 FSM 状态转换公开给外部消费者。 (最终目标是能够将状态转换消息推送到 websocket,以便可以实时查看它们。)
根据文档,Combining dynamic stages to build a simple Publish-Subscribe service,我似乎需要公开代表发布-订阅渠道的流,以便消费者和生产者可以使用它。
我遇到问题的部分是将新源附加到流,这样每个新生成的 actor 都会将其状态转换发布到源。另一个问题是向流添加新的接收器(最后,这将是 websockets,但出于测试目的,它可以是任何新的接收器)。
首先,我连接了一个 MergeHub 和一个 BroadcastHub 以形成一个 "channel",然后从物化的接收器和源创建流:
val orderFlow: Flow[String, String, NotUsed] = {
val (sink, source) = MergeHub.source[String](16)
.toMat(BroadcastHub.sink(256))(Keep.both).run()
Flow.fromSinkAndSource(sink, source)
}
问题是我如何动态地添加新的生产者和消费者到这个流中?有什么想法吗?
对于这个特殊问题,我不会使用 akka-stream
。您描述的多播类型 pub-sub 更适合原始 Actor
消息传递和 EventStream
。
在某些情况下,我是 akka-stream 的超级粉丝,但在这种情况下,我认为您是在尝试将方钉穿过圆孔。
我在我的一个项目中使用以下解决方案来处理来自多个请求处理器的 websocket 请求,这些处理器可以产生响应流或提供无限订阅。
// requests coming from websocket, it could be any source, it's doesn't matter
val requests: Source[Request, NotUsed] = ...
// the request processing here can provide endles stream of responses
val requestProcessing: Flow[Request, Response, NotUsed] = ...
val (outSink, outSource) =
MergeHub
.source[Result](perProducerBufferSize = 4)
.toMat(BroadcastHub.sink(bufferSize = 32))(Keep.both)
.run()
Source.tick(Duration.Zero, KeepAliveInterval, ConnectionKeepAlive)
.to(outSink)
.run()
requests.fold {
case State(state, AuthRequest(r)) if checkAuth(r) =>
Source.single(AuthenticationAck).to(outSink)
state.copy(isAuthenticated = true)
case State(state, AuthRequest(r)) =>
Source.single(AuthenticationFailedError).to(outSink)
state
case State(state, request) if s.isAuthenticated =>
// here the most of busines
Source.single(request).via(requestProcessing).to(outSink)
state
case State(state, _) =>
Source.single(NonAuthorizedError).to(outSink)
state
}.toMat(Sink.ignore)(Keep.right)
outSource.runForeach { response =>
// here we get the stream of responses mixed from all requests
}
outSource.runForeach { response =>
// of course, we could have as many subscribers as we need
}
希望对您有所帮助:)
我正在构建一个 Akka 应用程序,并希望将某些参与者的 FSM 状态转换公开给外部消费者。 (最终目标是能够将状态转换消息推送到 websocket,以便可以实时查看它们。)
根据文档,Combining dynamic stages to build a simple Publish-Subscribe service,我似乎需要公开代表发布-订阅渠道的流,以便消费者和生产者可以使用它。
我遇到问题的部分是将新源附加到流,这样每个新生成的 actor 都会将其状态转换发布到源。另一个问题是向流添加新的接收器(最后,这将是 websockets,但出于测试目的,它可以是任何新的接收器)。
首先,我连接了一个 MergeHub 和一个 BroadcastHub 以形成一个 "channel",然后从物化的接收器和源创建流:
val orderFlow: Flow[String, String, NotUsed] = {
val (sink, source) = MergeHub.source[String](16)
.toMat(BroadcastHub.sink(256))(Keep.both).run()
Flow.fromSinkAndSource(sink, source)
}
问题是我如何动态地添加新的生产者和消费者到这个流中?有什么想法吗?
对于这个特殊问题,我不会使用 akka-stream
。您描述的多播类型 pub-sub 更适合原始 Actor
消息传递和 EventStream
。
在某些情况下,我是 akka-stream 的超级粉丝,但在这种情况下,我认为您是在尝试将方钉穿过圆孔。
我在我的一个项目中使用以下解决方案来处理来自多个请求处理器的 websocket 请求,这些处理器可以产生响应流或提供无限订阅。
// requests coming from websocket, it could be any source, it's doesn't matter
val requests: Source[Request, NotUsed] = ...
// the request processing here can provide endles stream of responses
val requestProcessing: Flow[Request, Response, NotUsed] = ...
val (outSink, outSource) =
MergeHub
.source[Result](perProducerBufferSize = 4)
.toMat(BroadcastHub.sink(bufferSize = 32))(Keep.both)
.run()
Source.tick(Duration.Zero, KeepAliveInterval, ConnectionKeepAlive)
.to(outSink)
.run()
requests.fold {
case State(state, AuthRequest(r)) if checkAuth(r) =>
Source.single(AuthenticationAck).to(outSink)
state.copy(isAuthenticated = true)
case State(state, AuthRequest(r)) =>
Source.single(AuthenticationFailedError).to(outSink)
state
case State(state, request) if s.isAuthenticated =>
// here the most of busines
Source.single(request).via(requestProcessing).to(outSink)
state
case State(state, _) =>
Source.single(NonAuthorizedError).to(outSink)
state
}.toMat(Sink.ignore)(Keep.right)
outSource.runForeach { response =>
// here we get the stream of responses mixed from all requests
}
outSource.runForeach { response =>
// of course, we could have as many subscribers as we need
}
希望对您有所帮助:)