如何实现带跳过和条件停止的流

How to implement stream with skip and conditional stop

我尝试实现批处理。我的算法:

1) 首先我需要来自数据库的请求项目,初始 skip = 0。如果没有项目则完全停止处理。

  case class Item(i: Int)

  def getItems(skip: Int): Future[Seq[Item]] = {
    Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))
  }

2) 然后对每一项都做繁重的工作(parallelism = 4)

  def heavyJob(item: Item): Future[String] = Future {
    Thread.sleep(1000)
    item.i.toString + " done"
  }

3) 所有项目处理完成后,使用 skip += 100

转到第 1 步

我在尝试什么:

val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))

val flattened: Source[Item, _] = dbSource.mapConcat(identity)

val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))

procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))

但是我不知道如何实现分页

skip 递增可以用 Iterator 作为值的基础来源来处理:

val skipIncrement = 100

val skipIterator : () => Iterator[Int] = 
  () => Iterator from (0, skipIncrement)

此迭代器然后可用于驱动 akka Source 获取项目并将继续处理直到查询 returns 一个空的 Seq:

val databaseStillHasValues : Seq[Item] => Boolean = 
  (dbValues) => !dbValues.isEmpty

val itemSource : Source[Item, _] = 
  Source.fromIterator(skipIterator)
        .mapAsync(1)(getItems)
        .takeWhile(databaseStillHasValues)
        .mapConcat(identity)

heavyJob可以在Flow中使用:

val heavyParallelism = 4

val heavyFlow : Flow[Item, String, _] = 
  Flow[Item].mapAsync(heavyParallelism)(heavyJob)

最后,Source和Flow可以附加到Sink:

val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))

itemSource.via(heavyFlow)
          .runWith(printSink)