如何实现带跳过和条件停止的流
How to implement stream with skip and conditional stop
我尝试实现批处理。我的算法:
1) 首先我需要来自数据库的请求项目,初始 skip = 0
。如果没有项目则完全停止处理。
case class Item(i: Int)
def getItems(skip: Int): Future[Seq[Item]] = {
Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))
}
2) 然后对每一项都做繁重的工作(parallelism = 4
)
def heavyJob(item: Item): Future[String] = Future {
Thread.sleep(1000)
item.i.toString + " done"
}
3) 所有项目处理完成后,使用 skip += 100
转到第 1 步
我在尝试什么:
val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))
val flattened: Source[Item, _] = dbSource.mapConcat(identity)
val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))
procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))
但是我不知道如何实现分页
skip
递增可以用 Iterator
作为值的基础来源来处理:
val skipIncrement = 100
val skipIterator : () => Iterator[Int] =
() => Iterator from (0, skipIncrement)
此迭代器然后可用于驱动 akka Source
获取项目并将继续处理直到查询 returns 一个空的 Seq
:
val databaseStillHasValues : Seq[Item] => Boolean =
(dbValues) => !dbValues.isEmpty
val itemSource : Source[Item, _] =
Source.fromIterator(skipIterator)
.mapAsync(1)(getItems)
.takeWhile(databaseStillHasValues)
.mapConcat(identity)
heavyJob
可以在Flow
中使用:
val heavyParallelism = 4
val heavyFlow : Flow[Item, String, _] =
Flow[Item].mapAsync(heavyParallelism)(heavyJob)
最后,Source和Flow可以附加到Sink
:
val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))
itemSource.via(heavyFlow)
.runWith(printSink)
我尝试实现批处理。我的算法:
1) 首先我需要来自数据库的请求项目,初始 skip = 0
。如果没有项目则完全停止处理。
case class Item(i: Int)
def getItems(skip: Int): Future[Seq[Item]] = {
Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))
}
2) 然后对每一项都做繁重的工作(parallelism = 4
)
def heavyJob(item: Item): Future[String] = Future {
Thread.sleep(1000)
item.i.toString + " done"
}
3) 所有项目处理完成后,使用 skip += 100
我在尝试什么:
val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))
val flattened: Source[Item, _] = dbSource.mapConcat(identity)
val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))
procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))
但是我不知道如何实现分页
skip
递增可以用 Iterator
作为值的基础来源来处理:
val skipIncrement = 100
val skipIterator : () => Iterator[Int] =
() => Iterator from (0, skipIncrement)
此迭代器然后可用于驱动 akka Source
获取项目并将继续处理直到查询 returns 一个空的 Seq
:
val databaseStillHasValues : Seq[Item] => Boolean =
(dbValues) => !dbValues.isEmpty
val itemSource : Source[Item, _] =
Source.fromIterator(skipIterator)
.mapAsync(1)(getItems)
.takeWhile(databaseStillHasValues)
.mapConcat(identity)
heavyJob
可以在Flow
中使用:
val heavyParallelism = 4
val heavyFlow : Flow[Item, String, _] =
Flow[Item].mapAsync(heavyParallelism)(heavyJob)
最后,Source和Flow可以附加到Sink
:
val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))
itemSource.via(heavyFlow)
.runWith(printSink)