如何制作akka流源队列FIFO?

how to make akka stream source queue FIFO?

我正在使用akka开发一个股票交易系统。 我向 TradeQueue 提供订单簿,如下所示:

val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
  println("TradeTask Start:"+task)
  task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
     log.info("TradeTask finish:"+task)
 }))(Keep.left).run()


 for (item <- 1 to 100) {
    val task = TradeTask(item)
    tradeQueue.offer(task)
 }   

但是顺序乱了

像这样:

贸易任务开始:贸易任务(1)

贸易任务开始:贸易任务(2)

贸易任务完成:贸易任务(1)

贸易任务完成:贸易任务(2)

但是我想要FIFO和元素在上一个完成之前入队,像这样

贸易任务开始:贸易任务(1)

贸易任务完成:贸易任务(1)

贸易任务开始:贸易任务(2)

贸易任务完成:贸易任务(2)

怎么做?谢谢

已经是先进先出

你的问题已经证明队列是“(F)irst (I)n (F)irst (O)ut”。如输出所示,第一个进入流的元素 TradeTask(1)Sink:

处理的第一个元素
TradeTask Start:TradeTask(1)    // <-- FIRST IN

TradeTask Start:TradeTask(2)

TradeTask finish:TradeTask(1)   // <-- FIRST OUT

TradeTask finish:TradeTask(2)

间接回答

你问的问题和akka-stream的purpose/usage完全相反。您问的是如何创建一个 akka 流来串行执行所有处理,而不是异步处理。

akka的重点是asynchronous processing & communication:

Welcome to Akka, a set of open-source libraries for designing scalable, resilient systems that span processor cores and networks.

如果您希望一次完成一个处理,那么为什么首先要使用 akka? Iterable 元素的同步处理很容易完成,无需 akka,使用 scala 集合和 for-comprehensions:

val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???

val logTask : Task => Unit = 
  (task) => log.info("TradeTask finish:" + task)

for {
  item    <- 1 to 100
  task    <- TradeTask(item)
  aResult <- functionA(task)
  bResult <- functionB(aResult)
  cResult <- functionC(bResult)
} { 
  logTask(cResult)
}

同样,您可以使用函数组合和简化的迭代:

val compositeFunction : Int => Unit = 
  TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask

(1 to 100) foreach compositeFunction