如何制作akka流源队列FIFO?
how to make akka stream source queue FIFO?
我正在使用akka开发一个股票交易系统。
我向 TradeQueue 提供订单簿,如下所示:
val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
println("TradeTask Start:"+task)
task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
log.info("TradeTask finish:"+task)
}))(Keep.left).run()
for (item <- 1 to 100) {
val task = TradeTask(item)
tradeQueue.offer(task)
}
但是顺序乱了
像这样:
贸易任务开始:贸易任务(1)
贸易任务开始:贸易任务(2)
贸易任务完成:贸易任务(1)
贸易任务完成:贸易任务(2)
但是我想要FIFO和元素在上一个完成之前入队,像这样
贸易任务开始:贸易任务(1)
贸易任务完成:贸易任务(1)
贸易任务开始:贸易任务(2)
贸易任务完成:贸易任务(2)
怎么做?谢谢
已经是先进先出
你的问题已经证明队列是“(F)irst (I)n (F)irst (O)ut”。如输出所示,第一个进入流的元素 TradeTask(1)
是 Sink
:
处理的第一个元素
TradeTask Start:TradeTask(1) // <-- FIRST IN
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(1) // <-- FIRST OUT
TradeTask finish:TradeTask(2)
间接回答
你问的问题和akka-stream的purpose/usage完全相反。您问的是如何创建一个 akka 流来串行执行所有处理,而不是异步处理。
akka的重点是asynchronous processing & communication:
Welcome to Akka, a set of open-source libraries for designing
scalable, resilient systems that span processor cores and networks.
如果您希望一次完成一个处理,那么为什么首先要使用 akka? Iterable
元素的同步处理很容易完成,无需 akka,使用 scala 集合和 for-comprehensions:
val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???
val logTask : Task => Unit =
(task) => log.info("TradeTask finish:" + task)
for {
item <- 1 to 100
task <- TradeTask(item)
aResult <- functionA(task)
bResult <- functionB(aResult)
cResult <- functionC(bResult)
} {
logTask(cResult)
}
同样,您可以使用函数组合和简化的迭代:
val compositeFunction : Int => Unit =
TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask
(1 to 100) foreach compositeFunction
我正在使用akka开发一个股票交易系统。 我向 TradeQueue 提供订单簿,如下所示:
val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
println("TradeTask Start:"+task)
task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
log.info("TradeTask finish:"+task)
}))(Keep.left).run()
for (item <- 1 to 100) {
val task = TradeTask(item)
tradeQueue.offer(task)
}
但是顺序乱了
像这样:
贸易任务开始:贸易任务(1)
贸易任务开始:贸易任务(2)
贸易任务完成:贸易任务(1)
贸易任务完成:贸易任务(2)
但是我想要FIFO和元素在上一个完成之前入队,像这样
贸易任务开始:贸易任务(1)
贸易任务完成:贸易任务(1)
贸易任务开始:贸易任务(2)
贸易任务完成:贸易任务(2)
怎么做?谢谢
已经是先进先出
你的问题已经证明队列是“(F)irst (I)n (F)irst (O)ut”。如输出所示,第一个进入流的元素 TradeTask(1)
是 Sink
:
TradeTask Start:TradeTask(1) // <-- FIRST IN
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(1) // <-- FIRST OUT
TradeTask finish:TradeTask(2)
间接回答
你问的问题和akka-stream的purpose/usage完全相反。您问的是如何创建一个 akka 流来串行执行所有处理,而不是异步处理。
akka的重点是asynchronous processing & communication:
Welcome to Akka, a set of open-source libraries for designing scalable, resilient systems that span processor cores and networks.
如果您希望一次完成一个处理,那么为什么首先要使用 akka? Iterable
元素的同步处理很容易完成,无需 akka,使用 scala 集合和 for-comprehensions:
val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???
val logTask : Task => Unit =
(task) => log.info("TradeTask finish:" + task)
for {
item <- 1 to 100
task <- TradeTask(item)
aResult <- functionA(task)
bResult <- functionB(aResult)
cResult <- functionC(bResult)
} {
logTask(cResult)
}
同样,您可以使用函数组合和简化的迭代:
val compositeFunction : Int => Unit =
TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask
(1 to 100) foreach compositeFunction